

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exemples de code pour Amazon Managed Workflows pour Apache Airflow
<a name="sample-code"></a>

Ce guide contient des exemples de code, y compris DAGs des plugins personnalisés, que vous pouvez utiliser dans un environnement Amazon Managed Workflows for Apache Airflow. Pour d'autres exemples d'utilisation d'Apache Airflow avec AWS des services, reportez-vous au [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags)répertoire du référentiel Apache GitHub Airflow.

**Topics**
+ [Utilisation d'un DAG pour importer des variables dans la CLI](samples-variables-import.md)
+ [Création d'une connexion SSH à l'aide du `SSHOperator`](samples-ssh.md)
+ [Utilisation d'une clé secrète AWS Secrets Manager pour une connexion Apache Airflow Snowflake](samples-sm-snowflake.md)
+ [Utilisation d'un DAG pour écrire des métriques personnalisées dans CloudWatch](samples-custom-metrics.md)
+ [Nettoyage de base de données Aurora PostgreSQL dans un environnement Amazon MWAA](samples-database-cleanup.md)
+ [Exportation des métadonnées de l'environnement vers des fichiers CSV sur Amazon S3](samples-dag-run-info-to-csv.md)
+ [Utilisation d'une clé secrète AWS Secrets Manager pour une variable Apache Airflow](samples-secrets-manager-var.md)
+ [Utilisation d'une clé secrète AWS Secrets Manager pour une connexion Apache Airflow](samples-secrets-manager.md)
+ [Création d'un plugin personnalisé avec Oracle](samples-oracle.md)
+ [Modifier le fuseau horaire d'un DAG sur Amazon MWAA](samples-plugins-timezone.md)
+ [Actualisation d'un CodeArtifact jeton](samples-code-artifact.md)
+ [Création d'un plugin personnalisé avec Apache Hive et Hadoop](samples-hive.md)
+ [Création d'un plugin personnalisé pour Apache Airflow PythonVirtualenvOperator](samples-virtualenv.md)
+ [Invocation DAGs avec une fonction Lambda](samples-lambda.md)
+ [Invocation DAGs dans différents environnements Amazon MWAA](samples-invoke-dag.md)
+ [Utilisation d'Amazon MWAA avec Amazon RDS pour Microsoft SQL Server](samples-sql-server.md)
+ [Utilisation d'Amazon MWAA avec Amazon EKS](mwaa-eks-example.md)
+ [Connexion à Amazon ECS à l'aide du `ECSOperator`](samples-ecs-operator.md)
+ [Utiliser dbt avec Amazon MWAA](samples-dbt.md)
+ [AWS blogs et tutoriels](#samples-blogs-tutorials)

# Utilisation d'un DAG pour importer des variables dans la CLI
<a name="samples-variables-import"></a>

L'exemple de code suivant importe des variables à l'aide de la CLI sur Amazon Managed Workflows pour Apache Airflow.

**Topics**
+ [Version](#samples-variables-import-version)
+ [Prérequis](#samples-variables-import-prereqs)
+ [Autorisations](#samples-variables-import-permissions)
+ [Dépendances](#samples-variables-import-dependencies)
+ [Exemple de code](#samples-variables-import-code)
+ [Quelle est la prochaine étape ?](#samples-variables-import-next-up)

## Version
<a name="samples-variables-import-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-variables-import-prereqs"></a>

Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Autorisations
<a name="samples-variables-import-permissions"></a>

Vos Compte AWS besoins ont accès à la `AmazonMWAAAirflowCliAccess` politique. Pour en savoir plus, reportez-vous à[Politique de la CLI Apache Airflow : Amazon MWAAAirflow CliAccess](access-policies.md).

## Dépendances
<a name="samples-variables-import-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-variables-import-code"></a>

L'exemple de code suivant utilise trois entrées : le nom de votre environnement Amazon MWAA (in`mwaa_env`), le nom Région AWS de votre environnement (in`aws_region`) et le fichier local contenant les variables que vous souhaitez importer (in`var_file`).

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## Quelle est la prochaine étape ?
<a name="samples-variables-import-next-up"></a>
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).

# Création d'une connexion SSH à l'aide du `SSHOperator`
<a name="samples-ssh"></a>

L'exemple suivant décrit comment vous pouvez utiliser le DAG (`SSHOperator`in a directed acyclic graph) pour vous connecter à une EC2 instance Amazon distante depuis votre environnement Amazon Managed Workflows for Apache Airflow. Vous pouvez utiliser une approche similaire pour vous connecter à n'importe quelle instance distante avec un accès SSH.

Dans l'exemple suivant, vous chargez une clé secrète SSH (`.pem`) dans le `dags` répertoire de votre environnement sur Amazon S3. Ensuite, vous installez les dépendances nécessaires en utilisant `requirements.txt` et en créant une nouvelle connexion Apache Airflow dans l'interface utilisateur. Enfin, vous écrivez un DAG qui crée une connexion SSH avec l'instance distante.

**Topics**
+ [Version](#samples-ssh-version)
+ [Prérequis](#samples-ssh-prereqs)
+ [Autorisations](#samples-ssh-permissions)
+ [Prérequis](#samples-ssh-dependencies)
+ [Copiez votre clé secrète sur Amazon S3](#samples-ssh-secret)
+ [Création d'une nouvelle connexion Apache Airflow](#samples-ssh-connection)
+ [Exemple de code](#samples-ssh-code)

## Version
<a name="samples-ssh-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-ssh-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)
+ Une clé secrète SSH. L'exemple de code suppose que vous disposez d'une EC2 instance Amazon et d'une instance `.pem` située dans la même région que votre environnement Amazon MWAA. Si vous n'avez pas de clé, reportez-vous à la section [Créer ou importer une paire de clés](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) dans le *guide de EC2 l'utilisateur Amazon*.

## Autorisations
<a name="samples-ssh-permissions"></a>

Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Prérequis
<a name="samples-ssh-dependencies"></a>

Ajoutez le paramètre suivant pour `requirements.txt` installer le `apache-airflow-providers-ssh` package sur le serveur Web. Une fois que votre environnement est mis à jour et qu'Amazon MWAA a correctement installé la dépendance, vous obtiendrez un nouveau type de connexion **SSH** dans l'interface utilisateur.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**Note**  
`-c`définit l'URL des contraintes dans`requirements.txt`. Cela garantit qu'Amazon MWAA installe la version de package adaptée à votre environnement.

## Copiez votre clé secrète sur Amazon S3
<a name="samples-ssh-secret"></a>

Utilisez la AWS Command Line Interface commande suivante pour copier votre `.pem` clé dans le `dags` répertoire de votre environnement dans Amazon S3.

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA copie le contenu`dags`, y compris la `.pem` clé, dans le `/usr/local/airflow/dags/` répertoire local. Apache Airflow peut ainsi accéder à la clé.

## Création d'une nouvelle connexion Apache Airflow
<a name="samples-ssh-connection"></a>

**Pour créer une nouvelle connexion SSH à l'aide de l'interface utilisateur d'Apache Airflow**

1. Ouvrez la page [Environnements](https://console.aws.amazon.com/mwaa/home#/environments) sur la console Amazon MWAA.

1. Dans la liste des environnements, choisissez **Open Airflow UI** pour votre environnement.

1. **Sur la page de l'interface utilisateur d'Apache Airflow, choisissez **Admin** dans la barre de navigation principale pour développer la liste déroulante, puis sélectionnez Connections.**

1. Sur la page **Lister les connexions**, choisissez **\$1** ou le bouton **Ajouter un nouvel enregistrement** pour ajouter une nouvelle connexion.

1. Sur la page **Ajouter une connexion**, ajoutez les informations suivantes :

   1. Pour **ID de connexion**, entrez**ssh\$1new**.

   1. Pour **Type de connexion**, choisissez **SSH** dans la liste déroulante.
**Note**  
Si le type de connexion **SSH** n'est pas disponible dans la liste, Amazon MWAA n'a pas installé le package requis. `apache-airflow-providers-ssh` Mettez à jour votre `requirements.txt` fichier pour inclure ce package, puis réessayez.

   1. Pour **Host**, entrez l'adresse IP de l' EC2 instance Amazon à laquelle vous souhaitez vous connecter. Par exemple, **12.345.67.89**.

   1. Dans **Nom d'utilisateur**, saisissez **ec2-user** si vous vous connectez à une EC2 instance Amazon. Votre nom d'utilisateur peut être différent en fonction du type d'instance distante à laquelle vous souhaitez qu'Apache Airflow se connecte.

   1. Pour **Extra**, entrez la paire clé-valeur suivante au format JSON :

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      Cette paire clé-valeur indique à Apache Airflow de rechercher la clé secrète dans le répertoire local. `/dags`

## Exemple de code
<a name="samples-ssh-code"></a>

Le DAG suivant utilise le `SSHOperator` pour se connecter à votre EC2 instance Amazon cible, puis exécute la commande `hostname` Linux pour imprimer le nom de l'instance. Vous pouvez modifier le DAG pour exécuter n'importe quelle commande ou script sur l'instance distante.

1. Ouvrez un terminal et naviguez jusqu'au répertoire dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`ssh.py`.

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. En cas de succès, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des `ssh_task` tâches du `ssh_operator_example` DAG :

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Utilisation d'une clé secrète AWS Secrets Manager pour une connexion Apache Airflow Snowflake
<a name="samples-sm-snowflake"></a>

Les exemples d'appels suivants AWS Secrets Manager pour obtenir une clé secrète pour une connexion Apache Airflow Snowflake sur Amazon Managed Workflows for Apache Airflow. Cela suppose que vous avez terminé les étapes de[Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Version](#samples-sm-snowflake-version)
+ [Prérequis](#samples-sm-snowflake-prereqs)
+ [Autorisations](#samples-sm-snowflake-permissions)
+ [Prérequis](#samples-sm-snowflake-dependencies)
+ [Exemple de code](#samples-sm-snowflake-code)
+ [Quelle est la prochaine étape ?](#samples-sm-snowflake-next-up)

## Version
<a name="samples-sm-snowflake-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-sm-snowflake-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Le backend Secrets Manager en tant qu'option de configuration d'Apache Airflow, comme indiqué dans. [Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md)
+ Une chaîne de connexion Apache Airflow dans Secrets Manager, comme indiqué dans. [Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md)

## Autorisations
<a name="samples-sm-snowflake-permissions"></a>
+ Autorisations du Gestionnaire de Secrets Manager répertoriées dans[Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md).

## Prérequis
<a name="samples-sm-snowflake-dependencies"></a>

Pour utiliser l'exemple de code de cette page, ajoutez les dépendances suivantes à votre`requirements.txt`. Pour en savoir plus, reportez-vous à[Installation des dépendances Python](working-dags-dependencies.md).

```
apache-airflow-providers-snowflake==1.3.0
```

## Exemple de code
<a name="samples-sm-snowflake-code"></a>

Les étapes suivantes décrivent comment créer le code DAG qui appelle Secrets Manager pour obtenir le secret.

1. Dans votre invite de commande, accédez au répertoire dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`snowflake_connection.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## Quelle est la prochaine étape ?
<a name="samples-sm-snowflake-next-up"></a>
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).

# Utilisation d'un DAG pour écrire des métriques personnalisées dans CloudWatch
<a name="samples-custom-metrics"></a>

Vous pouvez utiliser l'exemple de code suivant pour écrire un graphe acyclique dirigé (DAG) qui exécute un `PythonOperator` afin de récupérer des métriques au niveau du système d'exploitation pour un environnement Amazon MWAA. Le DAG publie ensuite les données sous forme de métriques personnalisées sur Amazon CloudWatch.

Les métriques personnalisées au niveau du système d'exploitation vous offrent une visibilité supplémentaire sur la manière dont les employés de votre environnement utilisent les ressources telles que la mémoire virtuelle et le processeur. Vous pouvez utiliser ces informations pour sélectionner la [classe d'environnement](environment-class.md) la mieux adaptée à votre charge de travail.

**Topics**
+ [Version](#samples-custom-metrics-version)
+ [Prérequis](#samples-custom-metrics-prereqs)
+ [Autorisations](#samples-custom-metrics-permissions)
+ [Dépendances](#samples-custom-metrics-dependencies)
+ [Exemple de code](#samples-custom-metrics-code)

## Version
<a name="samples-custom-metrics-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-custom-metrics-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous avez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)

## Autorisations
<a name="samples-custom-metrics-permissions"></a>

Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Dépendances
<a name="samples-custom-metrics-dependencies"></a>
+ Aucune dépendance supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Exemple de code
<a name="samples-custom-metrics-code"></a>

1. Dans votre invite de commande, accédez au dossier dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le nom`dag-custom-metrics.py`. `MWAA-ENV-NAME`Remplacez-le par le nom de votre environnement.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si le DAG fonctionne correctement, vous obtenez quelque chose de similaire à ce qui suit dans vos journaux Apache Airflow :

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Nettoyage de base de données Aurora PostgreSQL dans un environnement Amazon MWAA
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow utilise une base de données Aurora PostgreSQL comme base de données de métadonnées Apache Airflow, dans laquelle le DAG s'exécute et les instances de tâches sont stockées. L'exemple de code suivant efface régulièrement les entrées de la base de données Aurora PostgreSQL dédiée à votre environnement Amazon MWAA.

**Topics**
+ [Version](#samples-database-cleanup-version)
+ [Conditions préalables](#samples-database-cleanup-prereqs)
+ [Dépendances](#samples-sql-server-dependencies)
+ [Exemple de code](#samples-database-cleanup-code)

## Version
<a name="samples-database-cleanup-version"></a>

Les exemples de code présentés sur cette page sont spécifiques à Apache Airflow v2 pris en charge sur Amazon MWAA. Reportez-vous aux versions d'[Apache Airflow prises en charge](airflow-versions.md).

**Astuce**  
**Pour les utilisateurs d'Apache Airflow v3** : si vous souhaitez nettoyer une base de données (purger les anciens enregistrements des tables de métastore), exécutez la commande CLI. `db clean`

## Conditions préalables
<a name="samples-database-cleanup-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)

## Dépendances
<a name="samples-sql-server-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-database-cleanup-code"></a>

Le DAG suivant nettoie la base de données de métadonnées pour les tables spécifiées dans`TABLES_TO_CLEAN`. L'exemple supprime les données des tables spécifiées datant de plus de 30 jours. Pour ajuster la date à laquelle les entrées ont été supprimées, définissez `MAX_AGE_IN_DAYS` une valeur différente.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Exportation des métadonnées de l'environnement vers des fichiers CSV sur Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Utilisez l'exemple de code suivant pour créer un graphe acyclique dirigé (DAG) qui interroge la base de données pour obtenir une série d'informations d'exécution du DAG et écrit les données dans des `.csv` fichiers stockés sur Amazon S3.

Vous souhaiterez peut-être exporter des informations depuis la base de données Aurora PostgreSQL de votre environnement pour inspecter les données localement, les archiver dans un stockage d'objets ou les combiner avec des outils tels que l'opérateur Amazon [S3 vers Amazon Redshift](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) et le [nettoyage de la base de données, afin de déplacer les](samples-database-cleanup.md) métadonnées Amazon MWAA hors de l'environnement, tout en les préservant pour une analyse future.

Vous pouvez interroger la base de données pour n'importe quel objet répertorié dans les modèles [Apache Airflow](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models). Cet exemple de code utilise trois modèles,`DagRun`, et `TaskFail``TaskInstance`, qui fournissent des informations relatives aux exécutions de To DAG.

**Topics**
+ [Version](#samples-dag-run-info-to-csv-version)
+ [Conditions préalables](#samples-dag-run-info-to-csv-prereqs)
+ [Permissions](#samples-dag-run-info-to-csv-permissions)
+ [Exigences](#samples-dag-run-info-to-csv-dependencies)
+ [Exemple de code](#samples-dag-run-info-to-csv-code)

## Version
<a name="samples-dag-run-info-to-csv-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)
+ Un [nouveau compartiment Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) dans lequel vous souhaitez exporter vos informations de métadonnées.

## Permissions
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA a besoin d'une autorisation pour que l'action Amazon S3 puisse `s3:PutObject` écrire les informations de métadonnées demandées dans Amazon S3. Ajoutez la déclaration de politique suivante au rôle d'exécution de votre environnement.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Cette politique limite l'accès en écriture uniquement à*amzn-s3-demo-bucket*.

## Exigences
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-dag-run-info-to-csv-code"></a>

Les étapes suivantes décrivent comment créer un DAG qui interroge Aurora PostgreSQL et écrit le résultat dans votre nouveau compartiment Amazon S3.

1. Dans votre terminal, accédez au répertoire dans lequel votre code DAG est enregistré. Par exemple :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le nom de`metadata_to_csv.py`. Vous pouvez modifier la valeur attribuée `MAX_AGE_IN_DAYS` à pour contrôler l'âge des enregistrements les plus anciens que votre DAG interroge dans la base de données de métadonnées.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. En cas de réussite, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches associées à la `export_db` tâche :

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Vous pouvez désormais accéder aux `.csv` fichiers exportés et les télécharger dans votre nouveau compartiment Amazon S3 dans`/files/export/`.

# Utilisation d'une clé secrète AWS Secrets Manager pour une variable Apache Airflow
<a name="samples-secrets-manager-var"></a>

Les exemples d'appels suivants AWS Secrets Manager pour obtenir une clé secrète pour une variable Apache Airflow sur Amazon Managed Workflows for Apache Airflow. Cela suppose que vous avez terminé les étapes de[Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Version](#samples-secrets-manager-var-version)
+ [Prérequis](#samples-secrets-manager-var-prereqs)
+ [Autorisations](#samples-secrets-manager-var-permissions)
+ [Prérequis](#samples-hive-dependencies)
+ [Exemple de code](#samples-secrets-manager-var-code)
+ [Quelle est la prochaine étape ?](#samples-secrets-manager-var-next-up)

## Version
<a name="samples-secrets-manager-var-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-secrets-manager-var-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Le backend Secrets Manager en tant qu'option de configuration d'Apache Airflow, comme indiqué dans. [Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md)
+ Chaîne de variable Apache Airflow dans Secrets Manager, comme indiqué dans. [Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md)

## Autorisations
<a name="samples-secrets-manager-var-permissions"></a>
+ Autorisations du Gestionnaire de Secrets Manager répertoriées dans[Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md).

## Prérequis
<a name="samples-hive-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-secrets-manager-var-code"></a>

Les étapes suivantes décrivent comment créer le code DAG qui appelle Secrets Manager pour obtenir le secret.

1. Dans votre invite de commande, accédez au répertoire dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`secrets-manager-var.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## Quelle est la prochaine étape ?
<a name="samples-secrets-manager-var-next-up"></a>
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).

# Utilisation d'une clé secrète AWS Secrets Manager pour une connexion Apache Airflow
<a name="samples-secrets-manager"></a>

Les exemples d'appels suivants AWS Secrets Manager pour obtenir une clé secrète pour une connexion Apache Airflow sur Amazon Managed Workflows for Apache Airflow. Cela suppose que vous avez terminé les étapes de[Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Version](#samples-secrets-manager-version)
+ [Prérequis](#samples-secrets-manager-prereqs)
+ [Autorisations](#samples-secrets-manager-permissions)
+ [Prérequis](#samples-hive-dependencies)
+ [Exemple de code](#samples-secrets-manager-code)
+ [Quelle est la prochaine étape ?](#samples-secrets-manager-next-up)

## Version
<a name="samples-secrets-manager-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-secrets-manager-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Le backend Secrets Manager en tant qu'option de configuration d'Apache Airflow, comme indiqué dans. [Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md)
+ Une chaîne de connexion Apache Airflow dans Secrets Manager, comme indiqué dans. [Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md)

## Autorisations
<a name="samples-secrets-manager-permissions"></a>
+ Autorisations du Gestionnaire de Secrets Manager, telles qu'elles sont répertoriées dans[Configuration d'une connexion Apache Airflow à l'aide d'un secret AWS Secrets Manager](connections-secrets-manager.md).

## Prérequis
<a name="samples-hive-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-secrets-manager-code"></a>

Les étapes suivantes décrivent comment créer le code DAG qui appelle Secrets Manager pour obtenir le secret.

1. Dans votre invite de commande, accédez au répertoire dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`secrets-manager.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## Quelle est la prochaine étape ?
<a name="samples-secrets-manager-next-up"></a>
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).

# Création d'un plugin personnalisé avec Oracle
<a name="samples-oracle"></a>

L'exemple suivant explique les étapes de création d'un plug-in personnalisé à l'aide d'Oracle pour Amazon MWAA et peut être combiné avec d'autres plug-ins et binaires personnalisés dans votre fichier plugins.zip.

**Contents**
+ [Version](#samples-oracle-version)
+ [Prérequis](#samples-oracle-prereqs)
+ [Autorisations](#samples-oracle-permissions)
+ [Prérequis](#samples-oracle-dependencies)
+ [Exemple de code](#samples-oracle-code)
+ [Créez le plugin personnalisé](#samples-oracle-create-pluginszip-steps)
  + [Dépendances de téléchargement](#samples-oracle-install)
  + [Plug-in personnalisé](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Options de configuration d'Airflow](#samples-oracle-airflow-config)
+ [Quelle est la prochaine étape ?](#samples-oracle-next-up)

## Version
<a name="samples-oracle-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-oracle-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)
+ La journalisation des travailleurs est activée à n'importe quel niveau de journal, `CRITICAL` ou dans la section précédente pour votre environnement. Pour plus d'informations sur les types de journaux Amazon MWAA et sur la façon de gérer vos groupes de journaux, consultez [Accès aux journaux Airflow sur Amazon CloudWatch](monitoring-airflow.md)

## Autorisations
<a name="samples-oracle-permissions"></a>

Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Prérequis
<a name="samples-oracle-dependencies"></a>

Pour utiliser l'exemple de code de cette page, ajoutez les dépendances suivantes à votre`requirements.txt`. Pour en savoir plus, reportez-vous à[Installation des dépendances Python](working-dags-dependencies.md).

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## Exemple de code
<a name="samples-oracle-code"></a>

Les étapes suivantes décrivent comment créer le code DAG qui testera le plugin personnalisé.

1. Dans votre invite de commande, accédez au répertoire dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`oracle.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## Créez le plugin personnalisé
<a name="samples-oracle-create-pluginszip-steps"></a>

Cette section décrit comment télécharger les dépendances, créer le plugin personnalisé et le fichier plugins.zip.

### Dépendances de téléchargement
<a name="samples-oracle-install"></a>

Amazon MWAA extraira le contenu du fichier plugins.zip dans `/usr/local/airflow/plugins` chaque planificateur et conteneur de travail Amazon MWAA. Ceci est utilisé pour ajouter des fichiers binaires à votre environnement. Les étapes suivantes décrivent comment assembler les fichiers nécessaires au plugin personnalisé.

**Extraire l'image du conteneur Amazon Linux**

1. Dans votre invite de commande, extrayez l'image du conteneur Amazon Linux et exécutez le conteneur localement. Exemples :

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   Votre invite de commande peut appeler une ligne de commande bash. Exemples :

   ```
   bash-4.2#
   ```

1. Installez la fonction asynchrone I/O native pour Linux (libaio).

   ```
   yum -y install libaio
   ```

1. Gardez cette fenêtre ouverte pour les étapes suivantes. Nous copierons les fichiers suivants localement :`lib64/libaio.so.1`,`lib64/libaio.so.1.0.0`,`lib64/libaio.so.1.0.1`.

**Télécharger le dossier client**

1. Installez le package de décompression localement. Exemples :

   ```
   sudo yum install unzip
   ```

1. Créez un répertoire `oracle_plugin`. Exemples :

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. Utilisez la commande curl suivante pour télécharger le [instantclient-basic-linuxfichier .x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip) depuis [Oracle](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html) Instant Client Downloads pour Linux x86-64 (64 bits).

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. Décompressez le fichier `client.zip`. Exemples :

   ```
   unzip *.zip
   ```

**Extraire des fichiers depuis Docker**

1. Dans une nouvelle invite de commande, affichez et notez votre identifiant de conteneur Docker. Exemples :

   ```
   docker container ls
   ```

   Votre invite de commande peut renvoyer tous les conteneurs et leurs IDs. Exemples :

   ```
   debc16fd6970
   ```

1. Dans votre `oracle_plugin` répertoire, extrayez les `lib64/libaio.so.1.0.1` fichiers `lib64/libaio.so.1``lib64/libaio.so.1.0.0`, dans le `instantclient_18_5` dossier local. Exemples :

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### Plug-in personnalisé
<a name="samples-oracle-plugins-code"></a>

Apache Airflow exécutera le contenu des fichiers Python dans le dossier des plugins au démarrage. Ceci est utilisé pour définir et modifier les variables d'environnement. Les étapes suivantes décrivent l'exemple de code du plugin personnalisé.
+ Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`env_var_plugin_oracle.py`.

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

Les étapes suivantes expliquent comment créer le`plugins.zip`. Le contenu de cet exemple peut être combiné avec vos autres plugins et fichiers binaires dans un seul `plugins.zip` fichier.

**Compressez le contenu du répertoire du plugin**

1. Dans votre invite de commande, accédez au `oracle_plugin` répertoire. Exemples :

   ```
   cd oracle_plugin
   ```

1. Compressez le `instantclient_18_5` répertoire dans le fichier plugins.zip. Exemples :

   ```
   zip -r ../plugins.zip ./
   ```

   Votre invite de commande affiche :

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. Supprimez le `client.zip` fichier. Exemples :

   ```
   rm client.zip
   ```

**Compressez le fichier env\$1var\$1plugin\$1oracle.py**

1. Ajoutez le `env_var_plugin_oracle.py` fichier à la racine du fichier plugins.zip. Exemples :

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. Votre fichier plugins.zip inclut désormais les éléments suivants :

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Options de configuration d'Airflow
<a name="samples-oracle-airflow-config"></a>

Si vous utilisez Apache Airflow v2, ajoutez-le en `core.lazy_load_plugins : False` tant qu'option de configuration d'Apache Airflow. Pour en savoir plus, reportez-vous à la section [Utilisation des options de configuration pour charger des plug-ins en 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Quelle est la prochaine étape ?
<a name="samples-oracle-next-up"></a>
+ Découvrez comment charger le `requirements.txt` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation des dépendances Python](working-dags-dependencies.md).
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).
+ Découvrez comment charger le `plugins.zip` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation de plugins personnalisés](configuring-dag-import-plugins.md).

# Modifier le fuseau horaire d'un DAG sur Amazon MWAA
<a name="samples-plugins-timezone"></a>

Apache Airflow planifie votre graphe acyclique dirigé (DAG) en UTC\$10 par défaut. [Les étapes suivantes expliquent comment modifier le fuseau horaire dans lequel Amazon MWAA exécute votre appareil DAGs avec Pendulum.](https://pypi.org/project/pendulum/) Cette rubrique explique éventuellement comment créer un plugin personnalisé pour modifier le fuseau horaire des journaux Apache Airflow de votre environnement.

**Topics**
+ [Version](#samples-plugins-timezone-version)
+ [Prérequis](#samples-plugins-timezone-prerequisites)
+ [Permissions](#samples-plugins-timezone-permissions)
+ [Créez un plugin pour modifier le fuseau horaire dans les journaux Airflow](#samples-plugins-timezone-custom-plugin)
+ [Créer une `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [Exemple de code](#samples-plugins-timezone-dag)
+ [Quelle est la prochaine étape ?](#samples-plugins-timezone-plugins-next-up)

## Version
<a name="samples-plugins-timezone-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-plugins-timezone-prerequisites"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)

## Permissions
<a name="samples-plugins-timezone-permissions"></a>

Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Créez un plugin pour modifier le fuseau horaire dans les journaux Airflow
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow exécute les fichiers Python du `plugins` répertoire au démarrage. Avec le plugin suivant, vous pouvez remplacer le fuseau horaire de l'exécuteur, qui modifie le fuseau horaire dans lequel Apache Airflow écrit les logs.

1. Créez un répertoire portant le nom `plugins` de votre plugin personnalisé, puis naviguez jusqu'au répertoire. Par exemple :

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement `dag-timezone-plugin.py` dans le `plugins` dossier.

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. Dans le `plugins` répertoire, créez un fichier Python vide nommé`__init__.py`. Votre `plugins` répertoire doit être similaire au suivant :

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## Créer une `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

Les étapes suivantes expliquent comment créer`plugins.zip`. Le contenu de cet exemple peut être combiné avec d'autres plugins et binaires dans un seul `plugins.zip` fichier.

1. Dans votre invite de commande, accédez au `plugins` répertoire de l'étape précédente. Par exemple :

   ```
   cd plugins
   ```

1. Compressez le contenu dans votre `plugins` répertoire.

   ```
   zip -r ../plugins.zip ./
   ```

1. Téléchargez `plugins.zip` dans votre compartiment S3

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## Exemple de code
<a name="samples-plugins-timezone-dag"></a>

Pour modifier le fuseau horaire par défaut (UTC\$10) dans lequel le DAG s'exécute, nous utiliserons une bibliothèque appelée [Pendulum](https://pypi.org/project/pendulum/), une bibliothèque Python permettant de travailler avec des date/heure tenant compte du fuseau horaire.

1. Dans votre invite de commande, accédez au répertoire dans lequel vous DAGs êtes enregistré. Par exemple :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple suivant et enregistrez-le sous`tz-aware-dag.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. En cas de succès, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches du `tz_test` DAG : `tz_aware_task`

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## Quelle est la prochaine étape ?
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ Découvrez comment charger le `plugins.zip` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation de plugins personnalisés](configuring-dag-import-plugins.md).

# Actualisation d'un CodeArtifact jeton
<a name="samples-code-artifact"></a>

Si vous utilisez CodeArtifact pour installer des dépendances Python, Amazon MWAA a besoin d'un jeton actif. Pour autoriser Amazon MWAA à accéder à un CodeArtifact référentiel lors de l'exécution, vous pouvez utiliser un [script de démarrage](using-startup-script.md) et le définir [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url)avec le jeton.

La rubrique suivante décrit comment créer un script de démarrage qui utilise l'opération [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token)CodeArtifact API pour récupérer un nouveau jeton chaque fois que votre environnement démarre ou se met à jour.

**Topics**
+ [Version](#samples-code-artifact-version)
+ [Prérequis](#samples-code-artifact-prereqs)
+ [Autorisations](#samples-code-artifact-permissions)
+ [Exemple de code](#samples-code-artifact-code)
+ [Quelle est la prochaine étape ?](#samples-code-artifact-next-up)

## Version
<a name="samples-code-artifact-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-code-artifact-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)
+ Un [CodeArtifact référentiel dans](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html) lequel vous stockez les dépendances de votre environnement.

## Autorisations
<a name="samples-code-artifact-permissions"></a>

Pour actualiser le CodeArtifact jeton et écrire le résultat dans Amazon S3, Amazon MWAA doit disposer des autorisations suivantes dans le rôle d'exécution.
+ L'`codeartifact:GetAuthorizationToken`action permet à Amazon MWAA de récupérer un nouveau jeton auprès de CodeArtifact. La politique suivante accorde l'autorisation pour chaque CodeArtifact domaine que vous créez. Vous pouvez restreindre davantage l'accès à vos domaines en modifiant la valeur de la ressource dans l'instruction et en spécifiant uniquement les domaines auxquels vous souhaitez que votre environnement accède.

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ L'`sts:GetServiceBearerToken`action est requise pour appeler l'opération CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html)API. Cette opération renvoie un jeton qui doit être utilisé lors de l'utilisation d'un gestionnaire de packages tel que `pip` with CodeArtifact. Pour utiliser un gestionnaire de packages avec un CodeArtifact référentiel, le rôle d'exécution de votre environnement doit être autorisé, `sts:GetServiceBearerToken` comme indiqué dans la déclaration de politique suivante.

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## Exemple de code
<a name="samples-code-artifact-code"></a>

Les étapes suivantes décrivent comment créer un script de démarrage qui met à jour le CodeArtifact jeton.

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`code_artifact_startup_script.sh`.

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. Accédez au dossier dans lequel vous avez enregistré le script. `cp`Utilisez-le dans une nouvelle fenêtre d'invite pour télécharger le script dans votre compartiment. Remplacez *amzn-s3-demo-bucket* par vos informations.

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   En cas de succès, Amazon S3 affiche le chemin URL de l'objet :

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Une fois le script chargé, votre environnement le met à jour et l'exécute au démarrage.

## Quelle est la prochaine étape ?
<a name="samples-code-artifact-next-up"></a>
+ Découvrez comment utiliser des scripts de démarrage pour personnaliser votre environnement dans[Utilisation d’un script de démarrage avec Amazon MWAA](using-startup-script.md).
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).
+ Découvrez comment charger le `plugins.zip` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation de plugins personnalisés](configuring-dag-import-plugins.md).

# Création d'un plugin personnalisé avec Apache Hive et Hadoop
<a name="samples-hive"></a>

Amazon MWAA extrait le contenu d'un `plugins.zip` à`/usr/local/airflow/plugins`. Cela peut être utilisé pour ajouter des fichiers binaires à vos conteneurs. En outre, Apache Airflow exécute le contenu des fichiers Python contenus dans le `plugins` dossier au *démarrage*, ce qui vous permet de définir et de modifier des variables d'environnement. L'exemple suivant explique les étapes de création d'un plugin personnalisé à l'aide d'Apache Hive et Hadoop dans un environnement Amazon Managed Workflows for Apache Airflow et peut être combiné avec d'autres plugins et binaires personnalisés.

**Topics**
+ [Version](#samples-hive-version)
+ [Prérequis](#samples-hive-prereqs)
+ [Autorisations](#samples-hive-permissions)
+ [Prérequis](#samples-hive-dependencies)
+ [Dépendances de téléchargement](#samples-hive-install)
+ [Plug-in personnalisé](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [Exemple de code](#samples-hive-code)
+ [Options de configuration d'Airflow](#samples-hive-airflow-config)
+ [Quelle est la prochaine étape ?](#samples-hive-next-up)

## Version
<a name="samples-hive-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-hive-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)

## Autorisations
<a name="samples-hive-permissions"></a>

Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Prérequis
<a name="samples-hive-dependencies"></a>

Pour utiliser l'exemple de code de cette page, ajoutez les dépendances suivantes à votre`requirements.txt`. Pour en savoir plus, reportez-vous à[Installation des dépendances Python](working-dags-dependencies.md).

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## Dépendances de téléchargement
<a name="samples-hive-install"></a>

Amazon MWAA extraira le contenu du fichier plugins.zip dans `/usr/local/airflow/plugins` chaque planificateur et conteneur de travail Amazon MWAA. Ceci est utilisé pour ajouter des fichiers binaires à votre environnement. Les étapes suivantes décrivent comment assembler les fichiers nécessaires au plugin personnalisé.

1. Dans votre invite de commande, accédez au répertoire dans lequel vous souhaitez créer votre plugin. Exemples :

   ```
   cd plugins
   ```

1. Téléchargez [Hadoop](https://hadoop.apache.org/) depuis un [miroir](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz), par exemple :

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. Téléchargez [Hive](https://hive.apache.org/) depuis un [miroir](https://www.apache.org/dyn/closer.cgi/hive/), par exemple :

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. Créez un répertoire. Exemples :

   ```
   mkdir hive_plugin
   ```

1. Extrayez Hadoop.

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Extrayez Hive.

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## Plug-in personnalisé
<a name="samples-hive-plugins-code"></a>

Apache Airflow exécutera le contenu des fichiers Python dans le dossier des plugins au démarrage. Ceci est utilisé pour définir et modifier les variables d'environnement. Les étapes suivantes décrivent l'exemple de code du plugin personnalisé.

1. Dans votre invite de commande, accédez au `hive_plugin` répertoire. Exemples :

   ```
   cd hive_plugin
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement `hive_plugin.py` dans le `hive_plugin` répertoire.

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. Copiez le contenu du texte suivant et enregistrez-le localement `.airflowignore` dans le `hive_plugin` répertoire.

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

Les étapes suivantes expliquent comment créer`plugins.zip`. Le contenu de cet exemple peut être combiné avec d'autres plugins et binaires dans un seul `plugins.zip` fichier.

1. Dans votre invite de commande, accédez au `hive_plugin` répertoire de l'étape précédente. Exemples :

   ```
   cd hive_plugin
   ```

1. Compressez le contenu de votre `plugins` dossier.

   ```
   zip -r ../hive_plugin.zip ./
   ```

## Exemple de code
<a name="samples-hive-code"></a>

Les étapes suivantes décrivent comment créer le code DAG qui testera le plugin personnalisé.

1. Dans votre invite de commande, accédez au répertoire dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`hive.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Options de configuration d'Airflow
<a name="samples-hive-airflow-config"></a>

Si vous utilisez Apache Airflow v2, ajoutez-le en `core.lazy_load_plugins : False` tant qu'option de configuration d'Apache Airflow. Pour en savoir plus, reportez-vous à la section [Utilisation des options de configuration pour charger des plug-ins en 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Quelle est la prochaine étape ?
<a name="samples-hive-next-up"></a>
+ Découvrez comment charger le `requirements.txt` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation des dépendances Python](working-dags-dependencies.md).
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).
+ Découvrez comment charger le `plugins.zip` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation de plugins personnalisés](configuring-dag-import-plugins.md).

# Création d'un plugin personnalisé pour Apache Airflow PythonVirtualenvOperator
<a name="samples-virtualenv"></a>

L'exemple suivant explique comment appliquer un correctif à Apache Airflow `PythonVirtualenvOperator` avec un plugin personnalisé sur Amazon Managed Workflows for Apache Airflow.

**Topics**
+ [Version](#samples-virtualenv-version)
+ [Prérequis](#samples-virtualenv-prereqs)
+ [Autorisations](#samples-virtualenv-permissions)
+ [Prérequis](#samples-virtualenv-dependencies)
+ [Exemple de code de plugin personnalisé](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [Exemple de code](#samples-virtualenv-code)
+ [Options de configuration d'Airflow](#samples-virtualenv-airflow-config)
+ [Quelle est la prochaine étape ?](#samples-virtualenv-next-up)

## Version
<a name="samples-virtualenv-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-virtualenv-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)

## Autorisations
<a name="samples-virtualenv-permissions"></a>

Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.

## Prérequis
<a name="samples-virtualenv-dependencies"></a>

Pour utiliser l'exemple de code de cette page, ajoutez les dépendances suivantes à votre`requirements.txt`. Pour en savoir plus, reportez-vous à[Installation des dépendances Python](working-dags-dependencies.md).

```
virtualenv
```

## Exemple de code de plugin personnalisé
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow exécutera le contenu des fichiers Python dans le dossier des plugins au démarrage. Ce plugin corrigera le module intégré `PythonVirtualenvOperator` au cours de ce processus de démarrage pour le rendre compatible avec Amazon MWAA. Les étapes suivantes affichent l'exemple de code du plugin personnalisé.

1. Dans votre invite de commande, accédez au `plugins` répertoire de la section précédente. Exemples :

   ```
   cd plugins
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`virtual_python_plugin.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

Les étapes suivantes expliquent comment créer le`plugins.zip`.

1. Dans votre invite de commande, accédez au répertoire figurant `virtual_python_plugin.py` dans la section précédente. Exemples :

   ```
   cd plugins
   ```

1. Compressez le contenu de votre `plugins` dossier.

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## Exemple de code
<a name="samples-virtualenv-code"></a>

Les étapes suivantes décrivent comment créer le code DAG pour le plugin personnalisé.

1. Dans votre invite de commande, accédez au répertoire dans lequel votre code DAG est stocké. Exemples :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`virtualenv_test.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Options de configuration d'Airflow
<a name="samples-virtualenv-airflow-config"></a>

Si vous utilisez Apache Airflow v2, ajoutez-le en `core.lazy_load_plugins : False` tant qu'option de configuration d'Apache Airflow. Pour en savoir plus, reportez-vous à la section [Utilisation des options de configuration pour charger des plug-ins en 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Quelle est la prochaine étape ?
<a name="samples-virtualenv-next-up"></a>
+ Découvrez comment charger le `requirements.txt` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation des dépendances Python](working-dags-dependencies.md).
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).
+ Découvrez comment charger le `plugins.zip` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation de plugins personnalisés](configuring-dag-import-plugins.md).

# Invocation DAGs avec une fonction Lambda
<a name="samples-lambda"></a>

L'exemple de code suivant utilise une [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)fonction pour obtenir un jeton CLI Apache Airflow et invoquer un graphe acyclique dirigé (DAG) dans un environnement Amazon MWAA.

**Topics**
+ [Version](#samples-lambda-version)
+ [Conditions préalables](#samples-lambda-prereqs)
+ [Permissions](#samples-lambda-permissions)
+ [Dépendances](#samples-lambda-dependencies)
+ [Exemple de code](#samples-lambda-code)

## Version
<a name="samples-lambda-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="samples-lambda-prereqs"></a>

Pour utiliser cet exemple de code, vous devez :
+ Utilisez le [mode d'accès au réseau public](configuring-networking.md#webserver-options-public-network-onconsole) pour votre [environnement Amazon MWAA.](get-started.md)
+ Utilisez une [fonction Lambda utilisant le dernier](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html) environnement d'exécution Python.

**Note**  
Si la fonction Lambda et votre environnement Amazon MWAA se trouvent dans le même VPC, vous pouvez utiliser ce code sur un réseau privé. Pour cette configuration, le rôle d'exécution de la fonction Lambda doit être autorisé à appeler l'opération d'API Amazon Elastic Compute Cloud (Amazon EC2)**CreateNetworkInterface**. Vous pouvez fournir cette autorisation à l'aide de la politique [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS-managed.

## Permissions
<a name="samples-lambda-permissions"></a>

Pour utiliser l'exemple de code présenté sur cette page, le rôle d'exécution de votre environnement Amazon MWAA doit avoir accès pour effectuer l'`airflow:CreateCliToken`action. Vous pouvez fournir cette autorisation à l'aide de la politique `AmazonMWAAAirflowCliAccess` AWS-managed :

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Pour plus d’informations, consultez [Politique de la CLI Apache Airflow : Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dépendances
<a name="samples-lambda-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-lambda-code"></a>

1. Ouvrez la AWS Lambda console à l'adresse [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/).

1. Choisissez votre fonction Lambda dans la liste des **fonctions**.

1. Sur la page des fonctions, copiez le code suivant et remplacez-le par le nom de vos ressources :
   + `YOUR_ENVIRONMENT_NAME`— Le nom de votre environnement Amazon MWAA.
   + `YOUR_DAG_NAME`— Le nom du DAG que vous souhaitez invoquer.

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. Choisissez **Déployer**.

1. Choisissez **Test** pour appeler votre fonction à l'aide de la console Lambda.

1. Pour vérifier que votre Lambda a correctement appelé votre DAG, utilisez la console Amazon MWAA pour accéder à l'interface utilisateur Apache Airflow de votre environnement, puis procédez comme suit :

   1. Sur la **DAGs**page, localisez votre nouveau DAG cible dans la liste des DAGs.

   1. Sous **Dernière exécution**, vérifiez l'horodatage de la dernière exécution du DAG. Cet horodatage doit correspondre étroitement à l'horodatage le plus récent de votre autre `invoke_dag` environnement.

   1. Sous **Tâches récentes**, vérifiez que la dernière exécution a été réussie.

# Invocation DAGs dans différents environnements Amazon MWAA
<a name="samples-invoke-dag"></a>

L'exemple de code suivant crée un jeton d'interface de ligne de commande Apache Airflow. Le code utilise ensuite un graphe acyclique dirigé (DAG) dans un environnement Amazon MWAA pour appeler un DAG dans un autre environnement Amazon MWAA.

**Topics**
+ [Version](#samples-invoke-dag-version)
+ [Conditions préalables](#samples-invoke-dag-prereqs)
+ [Permissions](#samples-invoke-dag-permissions)
+ [Dépendances](#samples-invoke-dag-dependencies)
+ [Exemple de code](#samples-invoke-dag-code)

## Version
<a name="samples-invoke-dag-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="samples-invoke-dag-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous avez besoin des éléments suivants :
+ Deux [environnements Amazon MWAA](get-started.md) avec accès au serveur Web du **réseau public**, y compris votre environnement actuel.
+ Un exemple de DAG chargé dans le bucket Amazon Simple Storage Service (Amazon S3) de votre environnement cible.

## Permissions
<a name="samples-invoke-dag-permissions"></a>

Pour utiliser l'exemple de code présenté sur cette page, le rôle d'exécution de votre environnement doit être autorisé à créer un jeton d'interface de ligne de commande Apache Airflow. Vous pouvez joindre la politique AWS-managed `AmazonMWAAAirflowCliAccess` pour accorder cette autorisation.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Pour plus d’informations, consultez [Politique de la CLI Apache Airflow : Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dépendances
<a name="samples-invoke-dag-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-invoke-dag-code"></a>

L'exemple de code suivant suppose que vous utilisez un DAG dans votre environnement actuel pour appeler un DAG dans un autre environnement.

1. Dans votre terminal, accédez au répertoire dans lequel votre code DAG est enregistré. Par exemple :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le nom`invoke_dag.py`. Remplacez les valeurs suivantes par vos informations.
   + `your-new-environment-name`— Le nom de l'autre environnement dans lequel vous souhaitez appeler le DAG.
   + `your-target-dag-id`— L'ID du DAG dans l'autre environnement que vous souhaitez invoquer.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si le DAG s'exécute correctement, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches pour`invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Pour vérifier que votre DAG a bien été invoqué, accédez à l'interface utilisateur Apache Airflow de votre nouvel environnement, puis procédez comme suit :

   1. Sur la **DAGs**page, localisez votre nouveau DAG cible dans la liste des DAGs.

   1. Sous **Dernière exécution**, vérifiez l'horodatage de la dernière exécution du DAG. Cet horodatage doit correspondre étroitement à l'horodatage le plus récent de votre autre `invoke_dag` environnement.

   1. Sous **Tâches récentes**, vérifiez que la dernière exécution a été réussie.

# Utilisation d'Amazon MWAA avec Amazon RDS pour Microsoft SQL Server
<a name="samples-sql-server"></a>

Vous pouvez utiliser Amazon Managed Workflows pour Apache Airflow pour vous connecter à un serveur [RDS pour SQL.](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html) L'exemple de code suivant est utilisé DAGs dans un environnement Amazon Managed Workflows for Apache Airflow pour se connecter et exécuter des requêtes sur un Amazon RDS pour Microsoft SQL Server.

**Topics**
+ [Version](#samples-sql-server-version)
+ [Prérequis](#samples-sql-server-prereqs)
+ [Dépendances](#samples-sql-server-dependencies)
+ [Connexion Apache Airflow v2](#samples-sql-server-conn)
+ [Exemple de code](#samples-sql-server-code)
+ [Quelle est la prochaine étape ?](#samples-sql-server-next-up)

## Version
<a name="samples-sql-server-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Prérequis
<a name="samples-sql-server-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)
+ Amazon MWAA et RDS for SQL Server s'exécutent dans le même Amazon VPC/
+ Les groupes de sécurité VPC d'Amazon MWAA et du serveur sont configurés avec les connexions suivantes :
  + Une règle entrante pour le port `1433` ouvert pour Amazon RDS dans le groupe de sécurité Amazon MWAA
  + Ou une règle sortante pour le port d'`1433`ouverture d'Amazon MWAA vers RDS
+ Apache Airflow Connection for RDS for SQL Server reflète le nom d'hôte, le port, le nom d'utilisateur et le mot de passe de la base de données Amazon RDS SQL Server créée lors du processus précédent.

## Dépendances
<a name="samples-sql-server-dependencies"></a>

Pour utiliser l'exemple de code présenté dans cette section, ajoutez la dépendance suivante à votre`requirements.txt`. Pour en savoir plus, reportez-vous à[Installation des dépendances Python](working-dags-dependencies.md).

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Connexion Apache Airflow v2
<a name="samples-sql-server-conn"></a>

Si vous utilisez une connexion dans Apache Airflow v2, assurez-vous que l'objet de connexion Airflow inclut les paires clé-valeur suivantes :

1. **Identifiant de connexion : mssql\$1default**

1. **Type de connexion :** Amazon Web Services

1. **Hôte :** `YOUR_DB_HOST`

1. **Schéma :**

1. **Identifiant :** admin

1. **Mot de passe :**

1. **Hafen :** 1433

1. **Supplémentaire :**

## Exemple de code
<a name="samples-sql-server-code"></a>

1. Dans votre invite de commande, accédez au répertoire dans lequel votre code DAG est stocké. Par exemple :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous`sql-server.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## Quelle est la prochaine étape ?
<a name="samples-sql-server-next-up"></a>
+ Découvrez comment charger le `requirements.txt` fichier dans cet exemple dans votre compartiment Amazon S3 dans[Installation des dépendances Python](working-dags-dependencies.md).
+ Découvrez comment télécharger le code DAG dans cet exemple dans le `dags` dossier de votre compartiment Amazon S3 dans[Ajouter ou mettre à jour DAGs](configuring-dag-folder.md).
+ Explorez des exemples de scripts et d'autres exemples de [modules pymssql](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html).
+ *Pour en savoir plus sur l'exécution de code SQL dans une base de données Microsoft SQL spécifique à l'aide du [mssql\$1operator, consultez le](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) guide de référence Apache Airflow.*

# Utilisation d'Amazon MWAA avec Amazon EKS
<a name="mwaa-eks-example"></a>

L'exemple suivant montre comment utiliser Amazon Managed Workflows pour Apache Airflow avec Amazon EKS.

**Topics**
+ [Version](#mwaa-eks-example-version)
+ [Conditions préalables](#eksctl-prereqs)
+ [Création d'une clé publique pour Amazon EC2](#eksctl-create-key)
+ [Créer le cluster](#create-cluster-eksctl)
+ [Création d'un espace `mwaa` de noms](#eksctl-namespace)
+ [Création d'un rôle pour l'espace de `mwaa` noms](#eksctl-role)
+ [Création et attachement d'un rôle IAM pour le cluster Amazon EKS](#eksctl-iam-role)
+ [Créez le fichier requirements.txt](#eksctl-requirements)
+ [Création d'un mappage d'identité pour Amazon EKS](#eksctl-identity-map)
+ [Créer le `kubeconfig`](#eksctl-kube-config)
+ [Création d'un DAG](#eksctl-create-dag)
+ [Ajoutez le DAG et `kube_config.yaml` au compartiment Amazon S3](#eksctl-dag-bucket)
+ [Activer et déclencher l'exemple](#eksctl-trigger-pod)

## Version
<a name="mwaa-eks-example-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="eksctl-prereqs"></a>

Pour utiliser l'exemple présenté dans cette rubrique, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)
+ eksctl. Pour en savoir plus, reportez-vous à la section [Installer eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl).
+ kubectl. Pour en savoir plus, reportez-vous à [Installer et configurer kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/). Dans certains cas, cela est installé avec eksctl.
+ Une paire de clés EC2 dans la région où vous créez votre environnement Amazon MWAA. Pour en savoir plus, reportez-vous à la section [Création ou importation d'une paire de clés](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair).

**Note**  
Lorsque vous utilisez une `eksctl` commande, vous pouvez inclure un `--profile` pour spécifier un profil autre que le profil par défaut.

## Création d'une clé publique pour Amazon EC2
<a name="eksctl-create-key"></a>

Utilisez la commande suivante pour créer une clé publique à partir de votre paire de clés privées.

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

Pour en savoir plus, reportez-vous à la section [Récupération de la clé publique de votre paire de clés](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key).

## Créer le cluster
<a name="create-cluster-eksctl"></a>

Utilisez la commande suivante pour créer le cluster. Si vous souhaitez donner un nom personnalisé au cluster ou le créer dans une autre région, remplacez le nom et les valeurs de région. Vous devez créer le cluster dans la même région que celle dans laquelle vous créez l'environnement Amazon MWAA. Remplacez les valeurs des sous-réseaux pour qu'elles correspondent aux sous-réseaux de votre réseau Amazon VPC que vous utilisez pour Amazon MWAA. Remplacez la valeur de pour `ssh-public-key` qu'elle corresponde à la clé que vous utilisez. Vous pouvez utiliser une clé existante d'Amazon EC2 qui se trouve dans la même région, ou créer une nouvelle clé dans la même région où vous créez votre environnement Amazon MWAA.

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

La création du cluster prend un certain temps. Une fois terminé, vous pouvez vérifier que le cluster a été créé correctement et que le fournisseur IAM OIDC est configuré à l'aide de la commande suivante :

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## Création d'un espace `mwaa` de noms
<a name="eksctl-namespace"></a>

Après avoir confirmé que le cluster a bien été créé, utilisez la commande suivante pour créer un espace de noms pour les pods.

```
kubectl create namespace mwaa
```

## Création d'un rôle pour l'espace de `mwaa` noms
<a name="eksctl-role"></a>

Après avoir créé l'espace de noms, créez un rôle et une liaison de rôles pour un utilisateur Amazon MWAA sur EKS qui peut exécuter des pods dans un espace de noms MWAA. Si vous avez utilisé un autre nom pour l'espace de noms, remplacez mwaa `-n mwaa` par le nom que vous avez utilisé.

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

Vérifiez que le nouveau rôle peut accéder au cluster Amazon EKS en exécutant la commande suivante. Assurez-vous d'utiliser le nom correct si vous n'avez pas utilisé *mwaa* :

```
kubectl get pods -n mwaa --as mwaa-service
```

Vous recevez un message qui dit :

```
No resources found in mwaa namespace.
```

## Création et attachement d'un rôle IAM pour le cluster Amazon EKS
<a name="eksctl-iam-role"></a>

Vous devez créer un rôle IAM, puis le lier au cluster Amazon EKS (k8s) afin qu'il puisse être utilisé pour l'authentification via IAM. Le rôle est uniquement utilisé pour se connecter au cluster et ne dispose d'aucune autorisation pour les appels de console ou d'API.

Créez un nouveau rôle pour l'environnement Amazon MWAA en suivant les étapes décrites dans[Rôle d'exécution Amazon MWAA](mwaa-create-role.md). Toutefois, au lieu de créer et de joindre les politiques décrites dans cette rubrique, associez la stratégie suivante :

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

Après avoir créé le rôle, modifiez votre environnement Amazon MWAA pour utiliser le rôle que vous avez créé comme rôle d'exécution pour l'environnement. Pour modifier le rôle, modifiez l'environnement à utiliser. Vous sélectionnez le rôle d'exécution sous **Autorisations**.

**Problèmes connus :**
+ Il existe un problème connu lié au fait que les rôles ARNs associés aux sous-chemins ne peuvent pas s'authentifier auprès d'Amazon EKS. La solution consiste à créer le rôle de service manuellement plutôt que d'utiliser celui créé par Amazon MWAA lui-même. Pour en savoir plus, reportez-vous à la section [Les rôles avec des chemins ne fonctionnent pas lorsque le chemin est inclus dans leur ARN dans la carte de configuration aws-auth](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ Si la liste des services Amazon MWAA n'est pas disponible dans IAM, vous devez choisir une autre politique de service, telle qu'Amazon EC2, puis mettre à jour la politique de confiance du rôle pour qu'elle corresponde à ce qui suit :

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  Pour en savoir plus, consultez [Comment utiliser les politiques de confiance avec les rôles IAM](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/).

## Créez le fichier requirements.txt
<a name="eksctl-requirements"></a>

Pour utiliser l'exemple de code présenté dans cette section, assurez-vous d'avoir ajouté l'une des options de base de données suivantes à votre`requirements.txt`. Pour en savoir plus, reportez-vous à[Installation des dépendances Python](working-dags-dependencies.md).

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Création d'un mappage d'identité pour Amazon EKS
<a name="eksctl-identity-map"></a>

Utilisez l'ARN du rôle que vous avez créé dans la commande suivante afin de créer un mappage d'identité pour Amazon EKS. Remplacez la région *us-east-1* par la région dans laquelle vous avez créé l'environnement. Remplacez l'ARN du rôle, puis remplacez-le par le *mwaa-execution-role* rôle d'exécution de votre environnement.

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## Créer le `kubeconfig`
<a name="eksctl-kube-config"></a>

Utilisez la commande suivante pour créer `kubeconfig` :

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

Si vous avez utilisé un profil spécifique lors de l'exécution, `update-kubeconfig` vous devez supprimer la `env:` section ajoutée au fichier kube\$1config.yaml afin qu'il fonctionne correctement avec Amazon MWAA. Pour ce faire, supprimez les éléments suivants du fichier, puis enregistrez-le :

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## Création d'un DAG
<a name="eksctl-create-dag"></a>

Utilisez l'exemple de code suivant pour créer un fichier Python, par exemple `mwaa_pod_example.py` pour le DAG.

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## Ajoutez le DAG et `kube_config.yaml` au compartiment Amazon S3
<a name="eksctl-dag-bucket"></a>

Placez le DAG que vous avez créé et le `kube_config.yaml` fichier dans le compartiment Amazon S3 pour l'environnement Amazon MWAA. Vous pouvez placer des fichiers dans votre compartiment à l'aide de la console Amazon S3 ou du AWS Command Line Interface.

## Activer et déclencher l'exemple
<a name="eksctl-trigger-pod"></a>

Dans Apache Airflow, activez l'exemple, puis déclenchez-le.

Une fois qu'il a été exécuté et terminé avec succès, utilisez la commande suivante pour vérifier le pod :

```
kubectl get pods -n mwaa
```

Vous obtenez un résultat similaire à ce qui suit :

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

Vous pouvez ensuite vérifier la sortie du pod à l'aide de la commande suivante. Remplacez la valeur du nom par la valeur renvoyée par la commande précédente :

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# Connexion à Amazon ECS à l'aide du `ECSOperator`
<a name="samples-ecs-operator"></a>

Cette rubrique décrit comment vous pouvez utiliser le `ECSOperator` pour vous connecter à un conteneur Amazon Elastic Container Service (Amazon ECS) depuis Amazon MWAA. Au cours des étapes suivantes, vous allez ajouter les autorisations requises au rôle d'exécution de votre environnement, utiliser un CloudFormation modèle pour créer un cluster Amazon ECS Fargate, puis créer et télécharger un DAG qui se connecte à votre nouveau cluster.

**Topics**
+ [Version](#samples-ecs-operator-version)
+ [Conditions préalables](#samples-ecs-operator-prereqs)
+ [Permissions](#samples-ecs-operator-permissions)
+ [Création d'un cluster Amazon ECS](#create-cfn-template)
+ [Exemple de code](#samples-ecs-operator-code)

## Version
<a name="samples-ecs-operator-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="samples-ecs-operator-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)

## Permissions
<a name="samples-ecs-operator-permissions"></a>
+ Le rôle d'exécution de votre environnement nécessite une autorisation pour exécuter des tâches dans Amazon ECS. Vous pouvez soit associer la politique FullAccess AWS gérée par [AmazonECS\$1](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) à votre rôle d'exécution, soit créer et associer la politique suivante à votre rôle d'exécution.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ Outre l'ajout des autorisations requises pour exécuter des tâches dans Amazon ECS, vous devez également modifier la déclaration de politique relative aux CloudWatch journaux dans votre rôle d'exécution Amazon MWAA afin de permettre l'accès au groupe de journaux de tâches Amazon ECS, comme indiqué ci-dessous. Le groupe de journaux Amazon ECS est créé par le CloudFormation modèle dans[Création d'un cluster Amazon ECS](#create-cfn-template).

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

Pour plus d'informations sur le rôle d'exécution Amazon MWAA et sur la manière d'associer une politique, reportez-vous à[Rôle d'exécution](mwaa-create-role.md).

## Création d'un cluster Amazon ECS
<a name="create-cfn-template"></a>

À l'aide du CloudFormation modèle suivant, vous allez créer un cluster Amazon ECS Fargate à utiliser avec votre flux de travail Amazon MWAA. Pour plus d'informations, consultez la section [Création d'une définition de tâche](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition) dans le manuel *Amazon Elastic Container Service Developer Guide*.

1. Créez un fichier JSON avec le code suivant et enregistrez-le sous`ecs-mwaa-cfn.json`.

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. Dans votre invite de commande, utilisez la AWS CLI commande suivante pour créer une nouvelle pile. Vous devez remplacer les valeurs `SecurityGroups` et par `SubnetIds` des valeurs pour les groupes de sécurité et les sous-réseaux de votre environnement Amazon MWAA.

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   Vous pouvez également utiliser le script shell suivant. Le script récupère les valeurs requises pour les groupes de sécurité et les sous-réseaux de votre environnement à l'aide de la `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI commande, puis crée la pile en conséquence. Pour exécuter le script, procédez comme suit.

   1. Copiez et enregistrez le script `ecs-stack-helper.sh` dans le même répertoire que votre CloudFormation modèle.

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. Exécutez le script à l'aide des commandes suivantes. Remplacez `environment-name` et `stack-name` par vos informations.

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   En cas de succès, vous vous référerez à la sortie suivante indiquant votre nouvel ID de CloudFormation pile.

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

Une fois votre CloudFormation stack terminé et AWS approvisionné vos ressources Amazon ECS, vous êtes prêt à créer et à télécharger votre DAG.

## Exemple de code
<a name="samples-ecs-operator-code"></a>

1. Ouvrez une invite de commande et naviguez jusqu'au répertoire dans lequel votre code DAG est stocké. Par exemple :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le `mwaa-ecs-operator.py` nom, puis téléchargez votre nouveau DAG sur Amazon S3.

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**Note**  
Dans l'exemple DAG, pour`awslogs_group`, vous devrez peut-être modifier le groupe de journaux avec le nom de votre groupe de journaux de tâches Amazon ECS. L'exemple suppose un groupe de journaux nommé`mwaa-ecs-zero`. Pour`awslogs_stream_prefix`, utilisez le préfixe du flux du journal des tâches Amazon ECS. L'exemple suppose un préfixe de flux de journal,`ecs`.

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. En cas de succès, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des `ecs_operator_task` tâches du `ecs_fargate_dag` DAG :

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Utiliser dbt avec Amazon MWAA
<a name="samples-dbt"></a>

Cette rubrique explique comment utiliser dbt et Postgres avec Amazon MWAA. Au cours des étapes suivantes, vous allez ajouter les dépendances requises à votre `requirements.txt` et télécharger un exemple de projet dbt dans le compartiment Amazon S3 de votre environnement. Ensuite, vous utiliserez un exemple de DAG pour vérifier qu'Amazon MWAA a installé les dépendances, puis vous utiliserez le `BashOperator` pour exécuter le projet dbt.

**Topics**
+ [Version](#samples-dbt-version)
+ [Conditions préalables](#samples-dbt-prereqs)
+ [Dépendances](#samples-dbt-dependencies)
+ [Importer un projet dbt sur Amazon S3](#samples-dbt-upload-project)
+ [Utiliser un DAG pour vérifier l'installation de la dépendance dbt](#samples-dbt-test-dependencies)
+ [Utiliser un DAG pour exécuter un projet dbt](#samples-dbt-run-project)

## Version
<a name="samples-dbt-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="samples-dbt-prereqs"></a>

Avant de pouvoir effectuer les étapes suivantes, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA](get-started.md) utilisant Apache Airflow v2.2.2. Cet exemple a été écrit et testé avec la version 2.2.2. Vous devrez peut-être modifier l'exemple pour l'utiliser avec d'autres versions d'Apache Airflow.
+ Un exemple de projet de dette. Pour commencer à utiliser dbt avec Amazon MWAA, vous pouvez créer un fork et cloner le [projet de démarrage dbt à partir du référentiel dbt-labs](https://github.com/dbt-labs/dbt-starter-project). GitHub 

## Dépendances
<a name="samples-dbt-dependencies"></a>

Pour utiliser Amazon MWAA avec dbt, ajoutez le script de démarrage suivant à votre environnement. Pour en savoir plus, consultez la section [Utilisation d'un script de démarrage avec Amazon MWAA](using-startup-script.md).

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

Dans les sections suivantes, vous allez télécharger le répertoire de votre projet dbt sur Amazon S3 et exécuter un DAG qui vérifie si Amazon MWAA a correctement installé les dépendances dbt requises.

## Importer un projet dbt sur Amazon S3
<a name="samples-dbt-upload-project"></a>

Pour pouvoir utiliser un projet dbt avec votre environnement Amazon MWAA, vous pouvez télécharger l'intégralité du répertoire du projet dans le dossier de `dags` votre environnement. Lorsque l'environnement est mis à jour, Amazon MWAA télécharge le répertoire dbt dans le dossier local`usr/local/airflow/dags/`.

**Pour télécharger un projet dbt sur Amazon S3**

1. Accédez au répertoire dans lequel vous avez cloné le projet de démarrage dbt.

1. Exécutez la AWS CLI commande Amazon S3 suivante pour copier de manière récursive le contenu du projet dans le `dags` dossier de votre environnement à l'aide du `--recursive` paramètre. La commande crée un sous-répertoire appelé `dbt` que vous pouvez utiliser pour tous vos projets dbt. Si le sous-répertoire existe déjà, les fichiers du projet sont copiés dans le répertoire existant et aucun nouveau répertoire n'est créé. La commande crée également un sous-répertoire dans le `dbt` répertoire pour ce projet de démarrage spécifique.

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   Vous pouvez utiliser différents noms pour les sous-répertoires de projets afin d'organiser plusieurs projets dbt dans le répertoire parent`dbt`.

## Utiliser un DAG pour vérifier l'installation de la dépendance dbt
<a name="samples-dbt-test-dependencies"></a>

Le DAG suivant utilise une commande `BashOperator` et une commande bash pour vérifier si Amazon MWAA a correctement installé les dépendances dbt spécifiées dans. `requirements.txt`

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

Procédez comme suit pour accéder aux journaux des tâches et vérifier que dbt et ses dépendances ont été installés.

1. Accédez à la console Amazon MWAA, puis choisissez **Open Airflow UI** dans la liste des environnements disponibles.

1. Dans l'interface utilisateur d'Apache Airflow, recherchez le `dbt-installation-test` DAG dans la liste, puis choisissez la date dans la `Last Run` colonne pour ouvrir la dernière tâche réussie.

1. À l'aide de **Graph View**, choisissez la `bash_command` tâche pour ouvrir les détails de l'instance de tâche.

1. Choisissez **Log** pour ouvrir les journaux des tâches, puis vérifiez que les journaux répertorient correctement la version de dbt dans `requirements.txt` laquelle nous avons spécifiée.

## Utiliser un DAG pour exécuter un projet dbt
<a name="samples-dbt-run-project"></a>

Le DAG suivant utilise un `BashOperator` pour copier les projets dbt que vous avez chargés sur Amazon S3 depuis le `usr/local/airflow/dags/` répertoire local vers le `/tmp` répertoire accessible en écriture, puis exécute le projet dbt. Les commandes bash supposent un projet dbt de démarrage intitulé. `dbt-starter-project` Modifiez le nom du répertoire en fonction du nom du répertoire de votre projet.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS blogs et tutoriels
<a name="samples-blogs-tutorials"></a>
+ [Utilisation d'Amazon EKS et Amazon MWAA pour Apache Airflow v2.x](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)