

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Esempi di codice per Amazon Managed Workflows for Apache Airflow
<a name="sample-code"></a>

Questa guida contiene esempi di codice, inclusi DAGs plug-in personalizzati, che puoi utilizzare in un ambiente Amazon Managed Workflows for Apache Airflow. Per altri esempi di utilizzo di Apache Airflow AWS con i servizi, consulta [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags)la directory nel repository Apache Airflow. GitHub 

**Topics**
+ [Utilizzo di un DAG per importare variabili nella CLI](samples-variables-import.md)
+ [Creazione di una connessione SSH utilizzando `SSHOperator`](samples-ssh.md)
+ [Utilizzo di una chiave segreta Gestione dei segreti AWS per una connessione Apache Airflow Snowflake](samples-sm-snowflake.md)
+ [Utilizzo di un DAG per scrivere metriche personalizzate in CloudWatch](samples-custom-metrics.md)
+ [Pulizia del database Aurora PostgreSQL in un ambiente Amazon MWAA](samples-database-cleanup.md)
+ [Esportazione di metadati ambientali in file CSV su Amazon S3](samples-dag-run-info-to-csv.md)
+ [Utilizzo di una chiave segreta Gestione dei segreti AWS per una variabile Apache Airflow](samples-secrets-manager-var.md)
+ [Utilizzo di una chiave segreta Gestione dei segreti AWS per una connessione Apache Airflow](samples-secrets-manager.md)
+ [Creazione di un plug-in personalizzato con Oracle](samples-oracle.md)
+ [Modifica del fuso orario di un DAG su Amazon MWAA](samples-plugins-timezone.md)
+ [Aggiornamento di un token CodeArtifact](samples-code-artifact.md)
+ [Creazione di un plugin personalizzato con Apache Hive e Hadoop](samples-hive.md)
+ [Creazione di un plugin personalizzato per Apache Airflow PythonVirtualenvOperator](samples-virtualenv.md)
+ [Invocare DAGs con una funzione Lambda](samples-lambda.md)
+ [Richiamo DAGs in diversi ambienti Amazon MWAA](samples-invoke-dag.md)
+ [Utilizzo di Amazon MWAA con Amazon RDS per Microsoft SQL Server](samples-sql-server.md)
+ [Utilizzo di Amazon MWAA con Amazon EKS](mwaa-eks-example.md)
+ [Connessione ad Amazon ECS tramite `ECSOperator`](samples-ecs-operator.md)
+ [Usare dbt con Amazon MWAA](samples-dbt.md)
+ [AWS blog e tutorial](#samples-blogs-tutorials)

# Utilizzo di un DAG per importare variabili nella CLI
<a name="samples-variables-import"></a>

Il codice di esempio seguente importa le variabili utilizzando la CLI su Amazon Managed Workflows for Apache Airflow.

**Topics**
+ [Versione](#samples-variables-import-version)
+ [Prerequisiti](#samples-variables-import-prereqs)
+ [Autorizzazioni](#samples-variables-import-permissions)
+ [Dipendenze](#samples-variables-import-dependencies)
+ [Esempio di codice](#samples-variables-import-code)
+ [Fasi successive](#samples-variables-import-next-up)

## Versione
<a name="samples-variables-import-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-variables-import-prereqs"></a>

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

## Autorizzazioni
<a name="samples-variables-import-permissions"></a>

È Account AWS necessario accedere alla `AmazonMWAAAirflowCliAccess` politica. Per ulteriori informazioni, fare riferimento a[Politica della CLI di Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md).

## Dipendenze
<a name="samples-variables-import-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2 e versioni successive, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-variables-import-code"></a>

Il codice di esempio seguente richiede tre input: il nome dell'ambiente Amazon MWAA (in`mwaa_env`), il nome Regione AWS dell'ambiente (in`aws_region`) e il file locale che contiene le variabili che desideri importare (in). `var_file`

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## Fasi successive
<a name="samples-variables-import-next-up"></a>
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)

# Creazione di una connessione SSH utilizzando `SSHOperator`
<a name="samples-ssh"></a>

L'esempio seguente descrive come utilizzare il `SSHOperator` in un grafo aciclico diretto (DAG) per connetterti a un' EC2 istanza Amazon remota dal tuo ambiente Amazon Managed Workflows for Apache Airflow. Puoi utilizzare un approccio simile per connetterti a qualsiasi istanza remota con accesso SSH.

Nell'esempio seguente, carichi una chiave segreta SSH (`.pem`) `dags` nella directory del tuo ambiente su Amazon S3. Quindi, installi le dipendenze necessarie utilizzando `requirements.txt` e creando una nuova connessione Apache Airflow nell'interfaccia utente. Infine, scrivi un DAG che crea una connessione SSH all'istanza remota.

**Topics**
+ [Versione](#samples-ssh-version)
+ [Prerequisiti](#samples-ssh-prereqs)
+ [Autorizzazioni](#samples-ssh-permissions)
+ [Requisiti](#samples-ssh-dependencies)
+ [Copia la tua chiave segreta su Amazon S3](#samples-ssh-secret)
+ [Creare una nuova connessione Apache Airflow](#samples-ssh-connection)
+ [Esempio di codice](#samples-ssh-code)

## Versione
<a name="samples-ssh-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-ssh-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).
+ Una chiave segreta SSH. L'esempio di codice presuppone che tu abbia un' EC2 istanza Amazon e una `.pem` nella stessa regione del tuo ambiente Amazon MWAA. Se non disponi di una chiave, consulta la sezione [Creare o importare una coppia di chiavi](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) nella *Amazon EC2 User Guide*.

## Autorizzazioni
<a name="samples-ssh-permissions"></a>

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

## Requisiti
<a name="samples-ssh-dependencies"></a>

Aggiungi il seguente parametro per `requirements.txt` installare il `apache-airflow-providers-ssh` pacchetto sul server web. Una volta aggiornato l'ambiente e dopo che Amazon MWAA avrà installato correttamente la dipendenza, otterrai un nuovo tipo di connessione **SSH** nell'interfaccia utente.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**Nota**  
`-c`definisce l'URL dei vincoli in. `requirements.txt` Ciò garantisce che Amazon MWAA installi la versione del pacchetto corretta per il tuo ambiente.

## Copia la tua chiave segreta su Amazon S3
<a name="samples-ssh-secret"></a>

Usa il seguente AWS Command Line Interface comando per copiare la `.pem` chiave `dags` nella directory del tuo ambiente in Amazon S3.

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA copia il contenuto`dags`, inclusa la `.pem` chiave, nella `/usr/local/airflow/dags/` directory locale. In questo modo, Apache Airflow può accedere alla chiave.

## Creare una nuova connessione Apache Airflow
<a name="samples-ssh-connection"></a>

**Per creare una nuova connessione SSH utilizzando l'interfaccia utente di Apache Airflow**

1. Apri la pagina [Ambienti](https://console.aws.amazon.com/mwaa/home#/environments) sulla console Amazon MWAA.

1. Dall'elenco degli ambienti, scegli **Open Airflow UI** per il tuo ambiente.

1. **Nella pagina dell'interfaccia utente di Apache Airflow, **scegli** Amministratore dalla barra di navigazione principale per espandere l'elenco a discesa, quindi scegli Connessioni.**

1. Nella pagina **Elenco connessioni**, scegli **\$1** o il pulsante **Aggiungi un nuovo record** per aggiungere una nuova connessione.

1. Nella pagina **Aggiungi connessione**, aggiungi le seguenti informazioni:

   1. Per **ID di connessione**, immettere**ssh\$1new**.

   1. Per **Tipo di connessione**, scegli **SSH** dall'elenco a discesa.
**Nota**  
Se il tipo di connessione **SSH** non è disponibile nell'elenco, Amazon MWAA non ha installato il pacchetto richiesto. `apache-airflow-providers-ssh` Aggiorna il `requirements.txt` file per includere questo pacchetto, quindi riprova.

   1. Per **Host**, inserisci l'indirizzo IP dell' EC2 istanza Amazon a cui desideri connetterti. Ad esempio, **12.345.67.89**.

   1. Per **Nome utente**, inserisci **ec2-user** se ti stai connettendo a un' EC2 istanza Amazon. Il tuo nome utente potrebbe essere diverso, a seconda del tipo di istanza remota a cui desideri connettere Apache Airflow.

   1. Per **Extra**, inserisci la seguente coppia chiave-valore in formato JSON:

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      Questa coppia chiave-valore indica ad Apache Airflow di cercare la chiave segreta nella directory locale. `/dags`

## Esempio di codice
<a name="samples-ssh-code"></a>

Il seguente DAG utilizza il `SSHOperator` per connettersi all' EC2istanza Amazon di destinazione, quindi esegue il comando `hostname` Linux per stampare il nome dell'istanza. È possibile modificare il DAG per eseguire qualsiasi comando o script sull'istanza remota.

1. Apri un terminale e vai alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome. `ssh.py`

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. In caso di successo, otterrete un output simile al seguente nei log delle attività del DAG: `ssh_task` `ssh_operator_example`

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Utilizzo di una chiave segreta Gestione dei segreti AWS per una connessione Apache Airflow Snowflake
<a name="samples-sm-snowflake"></a>

I seguenti esempi di chiamate Gestione dei segreti AWS per ottenere una chiave segreta per una connessione Apache Airflow Snowflake su Amazon Managed Workflows for Apache Airflow. Si presuppone che tu abbia completato i passaggi di. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)

**Topics**
+ [Versione](#samples-sm-snowflake-version)
+ [Prerequisiti](#samples-sm-snowflake-prereqs)
+ [Autorizzazioni](#samples-sm-snowflake-permissions)
+ [Requisiti](#samples-sm-snowflake-dependencies)
+ [Esempio di codice](#samples-sm-snowflake-code)
+ [Fasi successive](#samples-sm-snowflake-next-up)

## Versione
<a name="samples-sm-snowflake-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-sm-snowflake-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Il backend Secrets Manager come opzione di configurazione Apache Airflow, come elencato in. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)
+ Una stringa di connessione Apache Airflow in Secrets Manager, come elencato in. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)

## Autorizzazioni
<a name="samples-sm-snowflake-permissions"></a>
+ Autorizzazioni di Secrets Manager elencate in[Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md).

## Requisiti
<a name="samples-sm-snowflake-dependencies"></a>

Per utilizzare il codice di esempio in questa pagina, aggiungi le seguenti dipendenze al tuo. `requirements.txt` Per ulteriori informazioni, fare riferimento a. [Installazione delle dipendenze in Python](working-dags-dependencies.md)

```
apache-airflow-providers-snowflake==1.3.0
```

## Esempio di codice
<a name="samples-sm-snowflake-code"></a>

I passaggi seguenti descrivono come creare il codice DAG che richiama Secrets Manager per ottenere il segreto.

1. Nel prompt dei comandi, accedete alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome. `snowflake_connection.py`

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## Fasi successive
<a name="samples-sm-snowflake-next-up"></a>
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)

# Utilizzo di un DAG per scrivere metriche personalizzate in CloudWatch
<a name="samples-custom-metrics"></a>

Puoi utilizzare il seguente esempio di codice per scrivere un grafo aciclico diretto (DAG) che esegue un `PythonOperator` per recuperare parametri a livello di sistema operativo per un ambiente Amazon MWAA. Il DAG pubblica quindi i dati come metriche personalizzate su Amazon. CloudWatch

Le metriche personalizzate a livello di sistema operativo offrono una visibilità aggiuntiva sull'utilizzo di risorse come la memoria virtuale e la CPU da parte degli addetti all'ambiente. Puoi utilizzare queste informazioni per selezionare la [classe di ambiente](environment-class.md) più adatta al tuo carico di lavoro.

**Topics**
+ [Versione](#samples-custom-metrics-version)
+ [Prerequisiti](#samples-custom-metrics-prereqs)
+ [Autorizzazioni](#samples-custom-metrics-permissions)
+ [Dipendenze](#samples-custom-metrics-dependencies)
+ [esempio di codice](#samples-custom-metrics-code)

## Versione
<a name="samples-custom-metrics-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-custom-metrics-prereqs"></a>

Per utilizzare l'esempio di codice in questa pagina, è necessario quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Autorizzazioni
<a name="samples-custom-metrics-permissions"></a>

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

## Dipendenze
<a name="samples-custom-metrics-dependencies"></a>
+ Non sono necessarie dipendenze aggiuntive per utilizzare l'esempio di codice in questa pagina.

## esempio di codice
<a name="samples-custom-metrics-code"></a>

1. Nel prompt dei comandi, accedete alla cartella in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copia il contenuto del seguente esempio di codice e salvalo localmente come. `dag-custom-metrics.py` `MWAA-ENV-NAME`Sostituiscilo con il nome del tuo ambiente.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se il DAG viene eseguito correttamente, nei log di Apache Airflow si ottiene qualcosa di simile a quanto segue:

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Pulizia del database Aurora PostgreSQL in un ambiente Amazon MWAA
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow utilizza un database Aurora PostgreSQL come database di metadati Apache Airflow, in cui vengono eseguite le operazioni DAG e vengono archiviate le istanze delle attività. Il seguente codice di esempio cancella periodicamente le voci dal database Aurora PostgreSQL dedicato per il tuo ambiente Amazon MWAA.

**Topics**
+ [Versione](#samples-database-cleanup-version)
+ [Prerequisiti](#samples-database-cleanup-prereqs)
+ [Dipendenze](#samples-sql-server-dependencies)
+ [Esempio di codice](#samples-database-cleanup-code)

## Versione
<a name="samples-database-cleanup-version"></a>

Gli esempi di codice in questa pagina sono specifici di Apache Airflow v2 supportato su Amazon MWAA. Fai riferimento alle versioni di [Apache Airflow supportate](airflow-versions.md).

**Suggerimento**  
**Per gli utenti di Apache Airflow v3**: se desideri pulire un database (eliminare i vecchi record dalle tabelle dei metastore), esegui il comando CLI. `db clean`

## Prerequisiti
<a name="samples-database-cleanup-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Dipendenze
<a name="samples-sql-server-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-database-cleanup-code"></a>

Il seguente DAG pulisce il database dei metadati per le tabelle specificate in. `TABLES_TO_CLEAN` L'esempio elimina i dati dalle tabelle specificate che risalgono a più di 30 giorni. Per modificare la data di eliminazione delle voci, `MAX_AGE_IN_DAYS` impostate un valore diverso.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Esportazione di metadati ambientali in file CSV su Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Utilizza il seguente esempio di codice per creare un grafo aciclico diretto (DAG) che interroga il database per una serie di informazioni sull'esecuzione del DAG e scrive i dati in file archiviati `.csv` su Amazon S3.

Potresti voler esportare informazioni dal database Aurora PostgreSQL del tuo ambiente per ispezionare i dati localmente, archiviarli nello storage di oggetti o combinarli con strumenti come l'[operatore Amazon S3 to Amazon Redshift e la [pulizia](samples-database-cleanup.md) del database, per spostare i metadati Amazon MWAA fuori dall'ambiente e](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) conservarli per analisi future.

È possibile interrogare il database per qualsiasi oggetto elencato nei modelli [Apache Airflow](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models). Questo esempio di codice utilizza tre modelli,, e `DagRun` `TaskFail``TaskInstance`, che forniscono informazioni pertinenti alle esecuzioni di DAG.

**Topics**
+ [Versione](#samples-dag-run-info-to-csv-version)
+ [Prerequisiti](#samples-dag-run-info-to-csv-prereqs)
+ [Permissions](#samples-dag-run-info-to-csv-permissions)
+ [Requisiti](#samples-dag-run-info-to-csv-dependencies)
+ [Esempio di codice](#samples-dag-run-info-to-csv-code)

## Versione
<a name="samples-dag-run-info-to-csv-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).
+ Un [nuovo bucket Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) in cui esportare le informazioni sui metadati.

## Permissions
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA necessita dell'autorizzazione per l'`s3:PutObject`azione Amazon S3 di scrivere le informazioni sui metadati richieste su Amazon S3. Aggiungi la seguente dichiarazione politica al ruolo di esecuzione del tuo ambiente.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Questa politica limita solo l'accesso in scrittura a*amzn-s3-demo-bucket*.

## Requisiti
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2 e versioni successive, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-dag-run-info-to-csv-code"></a>

I passaggi seguenti descrivono come creare un DAG che interroga Aurora PostgreSQL e scrive il risultato nel nuovo bucket Amazon S3.

1. Nel tuo terminale, accedi alla directory in cui è memorizzato il codice DAG. Esempio:

   ```
   cd dags
   ```

1. Copia il contenuto del seguente esempio di codice e salvalo localmente come`metadata_to_csv.py`. È possibile modificare il valore assegnato per `MAX_AGE_IN_DAYS` controllare l'età dei record più vecchi interrogati dal DAG dal database dei metadati.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. In caso di successo, nei log delle attività dell'operazione verrà generato un risultato simile al seguente: `export_db`

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Ora puoi accedere e scaricare i `.csv` file esportati nel tuo nuovo bucket Amazon S3. `/files/export/`

# Utilizzo di una chiave segreta Gestione dei segreti AWS per una variabile Apache Airflow
<a name="samples-secrets-manager-var"></a>

I seguenti esempi di chiamate Gestione dei segreti AWS per ottenere una chiave segreta per una variabile Apache Airflow su Amazon Managed Workflows for Apache Airflow. Si presuppone che tu abbia completato i passaggi di. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)

**Topics**
+ [Versione](#samples-secrets-manager-var-version)
+ [Prerequisiti](#samples-secrets-manager-var-prereqs)
+ [Autorizzazioni](#samples-secrets-manager-var-permissions)
+ [Requisiti](#samples-hive-dependencies)
+ [Esempio di codice](#samples-secrets-manager-var-code)
+ [Fasi successive](#samples-secrets-manager-var-next-up)

## Versione
<a name="samples-secrets-manager-var-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-secrets-manager-var-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Il backend Secrets Manager come opzione di configurazione Apache Airflow, come elencato in. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)
+ Una stringa variabile Apache Airflow in Secrets Manager, come elencato in. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)

## Autorizzazioni
<a name="samples-secrets-manager-var-permissions"></a>
+ Autorizzazioni di Secrets Manager elencate in[Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md).

## Requisiti
<a name="samples-hive-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2 e versioni successive, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-secrets-manager-var-code"></a>

I passaggi seguenti descrivono come creare il codice DAG che chiama Secrets Manager per ottenere il segreto.

1. Nel prompt dei comandi, accedete alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copia il contenuto del seguente esempio di codice e salvalo localmente come. `secrets-manager-var.py`

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## Fasi successive
<a name="samples-secrets-manager-var-next-up"></a>
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)

# Utilizzo di una chiave segreta Gestione dei segreti AWS per una connessione Apache Airflow
<a name="samples-secrets-manager"></a>

I seguenti esempi di chiamate Gestione dei segreti AWS per ottenere una chiave segreta per una connessione Apache Airflow su Amazon Managed Workflows for Apache Airflow. Si presuppone che tu abbia completato i passaggi di. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)

**Topics**
+ [Versione](#samples-secrets-manager-version)
+ [Prerequisiti](#samples-secrets-manager-prereqs)
+ [Autorizzazioni](#samples-secrets-manager-permissions)
+ [Requisiti](#samples-hive-dependencies)
+ [Esempio di codice](#samples-secrets-manager-code)
+ [Fasi successive](#samples-secrets-manager-next-up)

## Versione
<a name="samples-secrets-manager-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-secrets-manager-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Il backend Secrets Manager come opzione di configurazione Apache Airflow, come elencato in. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)
+ Una stringa di connessione Apache Airflow in Secrets Manager, come elencato in. [Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md)

## Autorizzazioni
<a name="samples-secrets-manager-permissions"></a>
+ Autorizzazioni di Secrets Manager elencate in[Configurazione di una connessione Apache Airflow utilizzando un segreto Gestione dei segreti AWS](connections-secrets-manager.md).

## Requisiti
<a name="samples-hive-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2 e versioni successive, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-secrets-manager-code"></a>

I passaggi seguenti descrivono come creare il codice DAG che richiama Secrets Manager per ottenere il segreto.

1. Nel prompt dei comandi, accedete alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome. `secrets-manager.py`

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## Fasi successive
<a name="samples-secrets-manager-next-up"></a>
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)

# Creazione di un plug-in personalizzato con Oracle
<a name="samples-oracle"></a>

L'esempio seguente illustra i passaggi per creare un plug-in personalizzato utilizzando Oracle per Amazon MWAA e può essere combinato con altri plug-in e binari personalizzati nel file plugins.zip.

**Contents**
+ [Versione](#samples-oracle-version)
+ [Prerequisiti](#samples-oracle-prereqs)
+ [Autorizzazioni](#samples-oracle-permissions)
+ [Requisiti](#samples-oracle-dependencies)
+ [Esempio di codice](#samples-oracle-code)
+ [Create il plugin personalizzato](#samples-oracle-create-pluginszip-steps)
  + [Scarica le dipendenze](#samples-oracle-install)
  + [Plugin personalizzato](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Opzioni di configurazione di Airflow](#samples-oracle-airflow-config)
+ [Fasi successive](#samples-oracle-next-up)

## Versione
<a name="samples-oracle-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-oracle-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).
+ La registrazione dei lavoratori è abilitata a qualsiasi livello di registro `CRITICAL` o nella sezione precedente relativa all'ambiente in uso. Per ulteriori informazioni sui tipi di log di Amazon MWAA e su come gestire i gruppi di log, consulta [Accesso ai log Airflow in Amazon CloudWatch](monitoring-airflow.md)

## Autorizzazioni
<a name="samples-oracle-permissions"></a>

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

## Requisiti
<a name="samples-oracle-dependencies"></a>

Per utilizzare il codice di esempio in questa pagina, aggiungi le seguenti dipendenze al tuo. `requirements.txt` Per ulteriori informazioni, fare riferimento a. [Installazione delle dipendenze in Python](working-dags-dependencies.md)

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## Esempio di codice
<a name="samples-oracle-code"></a>

I passaggi seguenti descrivono come creare il codice DAG che testerà il plug-in personalizzato.

1. Nel prompt dei comandi, accedete alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome. `oracle.py`

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## Create il plugin personalizzato
<a name="samples-oracle-create-pluginszip-steps"></a>

Questa sezione descrive come scaricare le dipendenze, creare il plugin personalizzato e il file plugins.zip.

### Scarica le dipendenze
<a name="samples-oracle-install"></a>

Amazon MWAA estrarrà il contenuto di plugins.zip in `/usr/local/airflow/plugins` ogni container di scheduler e worker Amazon MWAA. Viene utilizzato per aggiungere file binari al tuo ambiente. I passaggi seguenti descrivono come assemblare i file necessari per il plug-in personalizzato.

**Estrai l'immagine del contenitore Amazon Linux**

1. Nel prompt dei comandi, recupera l'immagine del contenitore Amazon Linux ed esegui il contenitore localmente. Ad esempio:

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   Il prompt dei comandi può richiamare una riga di comando bash. Ad esempio:

   ```
   bash-4.2#
   ```

1. Installa la funzionalità asincrona nativa di Linux (libaio). I/O 

   ```
   yum -y install libaio
   ```

1. Tieni aperta questa finestra per i passaggi successivi. Copieremo localmente i seguenti file:`lib64/libaio.so.1`,`lib64/libaio.so.1.0.0`,`lib64/libaio.so.1.0.1`.

**Scarica la cartella del client**

1. Installa il pacchetto unzip localmente. Ad esempio:

   ```
   sudo yum install unzip
   ```

1. Crea una directory `oracle_plugin`. Ad esempio:

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. [Utilizzare il seguente comando curl per scaricare il file [instantclient-basic-linux.x64-18.5.0.0.0dbru.zip da Oracle Instant Client Downloads per Linux x86-64](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip) (64 bit).](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html)

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. Decomprimi il file `client.zip`. Ad esempio:

   ```
   unzip *.zip
   ```

**Estrai file da Docker**

1. In un nuovo prompt dei comandi, mostra e annota l'ID del contenitore Docker. Ad esempio:

   ```
   docker container ls
   ```

   Il prompt dei comandi può restituire tutti i contenitori e i relativi. IDs Ad esempio:

   ```
   debc16fd6970
   ```

1. Nella `oracle_plugin` directory, estrai i `lib64/libaio.so.1.0.1` file `lib64/libaio.so.1``lib64/libaio.so.1.0.0`,, `instantclient_18_5` nella cartella locale. Ad esempio:

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### Plugin personalizzato
<a name="samples-oracle-plugins-code"></a>

Apache Airflow eseguirà il contenuto dei file Python nella cartella plugins all'avvio. Viene utilizzato per impostare e modificare le variabili di ambiente. I passaggi seguenti descrivono il codice di esempio per il plug-in personalizzato.
+ Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome`env_var_plugin_oracle.py`.

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

I passaggi seguenti spiegano come creare il`plugins.zip`. Il contenuto di questo esempio può essere combinato con gli altri plugin e binari in un unico `plugins.zip` file.

**Comprimi il contenuto della cartella dei plugin**

1. Nel prompt dei comandi, accedi alla `oracle_plugin` directory. Ad esempio:

   ```
   cd oracle_plugin
   ```

1. Comprimi la `instantclient_18_5` directory in plugins.zip. Ad esempio:

   ```
   zip -r ../plugins.zip ./
   ```

   Il prompt dei comandi visualizza:

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. Rimuovi il `client.zip` file. Ad esempio:

   ```
   rm client.zip
   ```

**Comprimi il file env\$1var\$1plugin\$1oracle.py**

1. Aggiungi il `env_var_plugin_oracle.py` file alla radice del file plugins.zip. Ad esempio:

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. Il tuo plugins.zip ora include quanto segue:

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Opzioni di configurazione di Airflow
<a name="samples-oracle-airflow-config"></a>

Se utilizzi Apache Airflow v2, `core.lazy_load_plugins : False` aggiungilo come opzione di configurazione Apache Airflow. Per ulteriori informazioni, consulta [Usare le opzioni di configurazione per caricare i plugin in](configuring-env-variables.md#configuring-2.0-airflow-override) 2.

## Fasi successive
<a name="samples-oracle-next-up"></a>
+ Scopri come caricare il `requirements.txt` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione delle dipendenze in Python](working-dags-dependencies.md)
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)
+ Scopri di più su come caricare il `plugins.zip` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione di plugin personalizzati](configuring-dag-import-plugins.md)

# Modifica del fuso orario di un DAG su Amazon MWAA
<a name="samples-plugins-timezone"></a>

Apache Airflow pianifica il grafo aciclico diretto (DAG) in UTC\$10 per impostazione predefinita. [I passaggi seguenti illustrano come modificare il fuso orario in cui Amazon MWAA esegue Pendulum. DAGs ](https://pypi.org/project/pendulum/) Facoltativamente, questo argomento dimostra come creare un plug-in personalizzato per modificare il fuso orario per i log Apache Airflow del tuo ambiente.

**Topics**
+ [Versione](#samples-plugins-timezone-version)
+ [Prerequisiti](#samples-plugins-timezone-prerequisites)
+ [Permissions](#samples-plugins-timezone-permissions)
+ [Crea un plug-in per modificare il fuso orario nei registri di Airflow](#samples-plugins-timezone-custom-plugin)
+ [Creazione di una `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [Esempio di codice](#samples-plugins-timezone-dag)
+ [Fasi successive](#samples-plugins-timezone-plugins-next-up)

## Versione
<a name="samples-plugins-timezone-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-plugins-timezone-prerequisites"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Permissions
<a name="samples-plugins-timezone-permissions"></a>

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

## Crea un plug-in per modificare il fuso orario nei registri di Airflow
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow esegue i file Python nella directory all'avvio. `plugins` Con il seguente plugin, puoi sovrascrivere il fuso orario dell'esecutore, che modifica il fuso orario in cui Apache Airflow scrive i log.

1. Crea una directory con il nome del tuo plugin personalizzato e accedi `plugins` alla directory. Ad esempio:

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente come `dag-timezone-plugin.py` nella `plugins` cartella.

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. Nella `plugins` directory, crea un file Python vuoto denominato. `__init__.py` La tua `plugins` directory dovrebbe essere simile alla seguente:

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## Creazione di una `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

I passaggi seguenti spiegano come creare`plugins.zip`. Il contenuto di questo esempio può essere combinato con altri plugin e file binari in un unico `plugins.zip` file.

1. Nel prompt dei comandi, accedete alla `plugins` directory del passaggio precedente. Ad esempio:

   ```
   cd plugins
   ```

1. Comprimi il contenuto all'interno della `plugins` cartella.

   ```
   zip -r ../plugins.zip ./
   ```

1. Caricalo `plugins.zip` nel tuo bucket S3

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## Esempio di codice
<a name="samples-plugins-timezone-dag"></a>

Per cambiare il fuso orario predefinito (UTC\$10) in cui viene eseguito il DAG, useremo una libreria chiamata Pendulum[,](https://pypi.org/project/pendulum/) una libreria Python per lavorare con datetime che riconosce il fuso orario.

1. Nel prompt dei comandi, accedi alla directory in cui sei archiviato. DAGs Ad esempio:

   ```
   cd dags
   ```

1. Copiate il contenuto del seguente esempio e salvatelo con nome. `tz-aware-dag.py`

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. In caso di successo, otterrete un risultato simile al seguente nei log delle attività per il DAG: `tz_aware_task` `tz_test`

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## Fasi successive
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ Scopri di più su come caricare il `plugins.zip` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione di plugin personalizzati](configuring-dag-import-plugins.md)

# Aggiornamento di un token CodeArtifact
<a name="samples-code-artifact"></a>

Se utilizzi CodeArtifact per installare dipendenze Python, Amazon MWAA richiede un token attivo. Per consentire ad Amazon MWAA di accedere a un CodeArtifact repository in fase di esecuzione, puoi utilizzare uno [script di avvio](using-startup-script.md) e impostarlo [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url)con il token.

Il seguente argomento descrive come creare uno script di avvio che utilizza l'operazione [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token)CodeArtifact API per recuperare un nuovo token ogni volta che l'ambiente si avvia o si aggiorna.

**Topics**
+ [Versione](#samples-code-artifact-version)
+ [Prerequisiti](#samples-code-artifact-prereqs)
+ [Autorizzazioni](#samples-code-artifact-permissions)
+ [Esempio di codice](#samples-code-artifact-code)
+ [Fasi successive](#samples-code-artifact-next-up)

## Versione
<a name="samples-code-artifact-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-code-artifact-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).
+ Un [CodeArtifact repository](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html) in cui archiviare le dipendenze per il proprio ambiente.

## Autorizzazioni
<a name="samples-code-artifact-permissions"></a>

Per aggiornare il CodeArtifact token e scrivere il risultato su Amazon S3, Amazon MWAA deve disporre delle seguenti autorizzazioni nel ruolo di esecuzione.
+ L'`codeartifact:GetAuthorizationToken`azione consente ad Amazon MWAA di recuperare un nuovo token da. CodeArtifact La seguente politica concede l'autorizzazione per ogni CodeArtifact dominio creato. Puoi limitare ulteriormente l'accesso ai tuoi domini modificando il valore della risorsa nell'istruzione e specificando solo i domini a cui desideri che l'ambiente acceda.

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ L'`sts:GetServiceBearerToken`azione è necessaria per richiamare l'operazione API. CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html) Questa operazione restituisce un token che deve essere utilizzato quando si utilizza un gestore di pacchetti come `pip` with CodeArtifact. Per utilizzare un gestore di pacchetti con un CodeArtifact repository, il ruolo del ruolo di esecuzione dell'ambiente deve consentire, `sts:GetServiceBearerToken` come elencato nella seguente dichiarazione politica.

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## Esempio di codice
<a name="samples-code-artifact-code"></a>

I passaggi seguenti descrivono come creare uno script di avvio che aggiorni il CodeArtifact token.

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome`code_artifact_startup_script.sh`.

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. Accedere alla cartella in cui è stato salvato lo script. Utilizzalo `cp` in una nuova finestra di richiesta per caricare lo script nel tuo bucket. Sostituiscilo *amzn-s3-demo-bucket* con le tue informazioni.

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   In caso di successo, Amazon S3 restituisce il percorso URL dell'oggetto:

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Dopo aver caricato lo script, l'ambiente si aggiorna ed esegue lo script all'avvio.

## Fasi successive
<a name="samples-code-artifact-next-up"></a>
+ Scopri come utilizzare gli script di avvio per personalizzare il tuo ambiente. [Utilizzo di uno script di avvio con Amazon MWAA](using-startup-script.md)
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)
+ Scopri di più su come caricare il `plugins.zip` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione di plugin personalizzati](configuring-dag-import-plugins.md)

# Creazione di un plugin personalizzato con Apache Hive e Hadoop
<a name="samples-hive"></a>

Amazon MWAA estrae il contenuto di un to. `plugins.zip` `/usr/local/airflow/plugins` Questo può essere usato per aggiungere file binari ai contenitori. Inoltre, Apache Airflow esegue il contenuto dei file Python nella cartella *all'*avvio, consentendoti di impostare e modificare `plugins` le variabili di ambiente. L'esempio seguente illustra i passaggi per creare un plug-in personalizzato utilizzando Apache Hive e Hadoop in un ambiente Amazon Managed Workflows for Apache Airflow e può essere combinato con altri plug-in e binari personalizzati.

**Topics**
+ [Versione](#samples-hive-version)
+ [Prerequisiti](#samples-hive-prereqs)
+ [Autorizzazioni](#samples-hive-permissions)
+ [Requisiti](#samples-hive-dependencies)
+ [Scarica le dipendenze](#samples-hive-install)
+ [Plugin personalizzato](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [Esempio di codice](#samples-hive-code)
+ [Opzioni di configurazione di Airflow](#samples-hive-airflow-config)
+ [Fasi successive](#samples-hive-next-up)

## Versione
<a name="samples-hive-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-hive-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Autorizzazioni
<a name="samples-hive-permissions"></a>

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

## Requisiti
<a name="samples-hive-dependencies"></a>

Per utilizzare il codice di esempio in questa pagina, aggiungi le seguenti dipendenze al tuo. `requirements.txt` Per ulteriori informazioni, fare riferimento a. [Installazione delle dipendenze in Python](working-dags-dependencies.md)

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## Scarica le dipendenze
<a name="samples-hive-install"></a>

Amazon MWAA estrarrà il contenuto di plugins.zip in `/usr/local/airflow/plugins` ogni container di scheduler e worker Amazon MWAA. Viene utilizzato per aggiungere file binari al tuo ambiente. I passaggi seguenti descrivono come assemblare i file necessari per il plug-in personalizzato.

1. Nel prompt dei comandi, accedete alla directory in cui desiderate creare il plugin. Ad esempio:

   ```
   cd plugins
   ```

1. Scarica [Hadoop](https://hadoop.apache.org/) da un [mirror](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz), ad esempio:

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. Scarica [Hive](https://hive.apache.org/) da un [mirror](https://www.apache.org/dyn/closer.cgi/hive/), ad esempio:

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. Crea una directory. Ad esempio:

   ```
   mkdir hive_plugin
   ```

1. Estrai Hadoop.

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Estrai Hive.

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## Plugin personalizzato
<a name="samples-hive-plugins-code"></a>

Apache Airflow eseguirà il contenuto dei file Python nella cartella plugins all'avvio. Viene utilizzato per impostare e modificare le variabili di ambiente. I passaggi seguenti descrivono il codice di esempio per il plug-in personalizzato.

1. Nel prompt dei comandi, accedi alla `hive_plugin` directory. Ad esempio:

   ```
   cd hive_plugin
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente come `hive_plugin.py` nella `hive_plugin` directory.

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. Copia il contenuto del testo seguente e salvalo localmente come `.airflowignore` nella `hive_plugin` directory.

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

I passaggi seguenti spiegano come creare`plugins.zip`. Il contenuto di questo esempio può essere combinato con altri plugin e binari in un unico `plugins.zip` file.

1. Nel prompt dei comandi, accedete alla `hive_plugin` directory del passaggio precedente. Ad esempio:

   ```
   cd hive_plugin
   ```

1. Comprimi il contenuto all'interno della `plugins` cartella.

   ```
   zip -r ../hive_plugin.zip ./
   ```

## Esempio di codice
<a name="samples-hive-code"></a>

I passaggi seguenti descrivono come creare il codice DAG che testerà il plug-in personalizzato.

1. Nel prompt dei comandi, accedete alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copia il contenuto del seguente esempio di codice e salvalo localmente come. `hive.py`

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Opzioni di configurazione di Airflow
<a name="samples-hive-airflow-config"></a>

Se utilizzi Apache Airflow v2, `core.lazy_load_plugins : False` aggiungilo come opzione di configurazione Apache Airflow. Per ulteriori informazioni, consulta [Usare le opzioni di configurazione per caricare i plugin in](configuring-env-variables.md#configuring-2.0-airflow-override) 2.

## Fasi successive
<a name="samples-hive-next-up"></a>
+ Scopri come caricare il `requirements.txt` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione delle dipendenze in Python](working-dags-dependencies.md)
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)
+ Scopri di più su come caricare il `plugins.zip` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione di plugin personalizzati](configuring-dag-import-plugins.md)

# Creazione di un plugin personalizzato per Apache Airflow PythonVirtualenvOperator
<a name="samples-virtualenv"></a>

L'esempio seguente spiega come applicare una patch ad Apache `PythonVirtualenvOperator` Airflow con un plug-in personalizzato su Amazon Managed Workflows per Apache Airflow.

**Topics**
+ [Versione](#samples-virtualenv-version)
+ [Prerequisiti](#samples-virtualenv-prereqs)
+ [Autorizzazioni](#samples-virtualenv-permissions)
+ [Requisiti](#samples-virtualenv-dependencies)
+ [Codice di esempio del plug-in personalizzato](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [Esempio di codice](#samples-virtualenv-code)
+ [Opzioni di configurazione di Airflow](#samples-virtualenv-airflow-config)
+ [Fasi successive](#samples-virtualenv-next-up)

## Versione
<a name="samples-virtualenv-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-virtualenv-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Autorizzazioni
<a name="samples-virtualenv-permissions"></a>

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

## Requisiti
<a name="samples-virtualenv-dependencies"></a>

Per utilizzare il codice di esempio in questa pagina, aggiungi le seguenti dipendenze al tuo. `requirements.txt` Per ulteriori informazioni, fare riferimento a. [Installazione delle dipendenze in Python](working-dags-dependencies.md)

```
virtualenv
```

## Codice di esempio del plug-in personalizzato
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow eseguirà il contenuto dei file Python nella cartella plugins all'avvio. Questo plug-in applicherà una patch al componente integrato `PythonVirtualenvOperator` durante il processo di avvio per renderlo compatibile con Amazon MWAA. I passaggi seguenti mostrano il codice di esempio per il plug-in personalizzato.

1. Nel prompt dei comandi, accedete alla `plugins` directory nella sezione precedente. Ad esempio:

   ```
   cd plugins
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome. `virtual_python_plugin.py`

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

I passaggi seguenti spiegano come creare il`plugins.zip`.

1. Nel prompt dei comandi, accedi alla directory contenuta `virtual_python_plugin.py` nella sezione precedente. Ad esempio:

   ```
   cd plugins
   ```

1. Comprimi il contenuto all'interno della `plugins` cartella.

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## Esempio di codice
<a name="samples-virtualenv-code"></a>

I passaggi seguenti descrivono come creare il codice DAG per il plug-in personalizzato.

1. Nel prompt dei comandi, accedete alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome. `virtualenv_test.py`

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Opzioni di configurazione di Airflow
<a name="samples-virtualenv-airflow-config"></a>

Se utilizzi Apache Airflow v2, `core.lazy_load_plugins : False` aggiungilo come opzione di configurazione Apache Airflow. Per ulteriori informazioni, consulta [Usare le opzioni di configurazione per caricare i plugin in](configuring-env-variables.md#configuring-2.0-airflow-override) 2.

## Fasi successive
<a name="samples-virtualenv-next-up"></a>
+ Scopri come caricare il `requirements.txt` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione delle dipendenze in Python](working-dags-dependencies.md)
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)
+ Scopri di più su come caricare il `plugins.zip` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione di plugin personalizzati](configuring-dag-import-plugins.md)

# Invocare DAGs con una funzione Lambda
<a name="samples-lambda"></a>

Il seguente esempio di codice utilizza una [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)funzione per ottenere un token CLI Apache Airflow e richiamare un grafo aciclico diretto (DAG) in un ambiente Amazon MWAA.

**Topics**
+ [Versione](#samples-lambda-version)
+ [Prerequisiti](#samples-lambda-prereqs)
+ [Permissions](#samples-lambda-permissions)
+ [Dipendenze](#samples-lambda-dependencies)
+ [Esempio di codice](#samples-lambda-code)

## Versione
<a name="samples-lambda-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-lambda-prereqs"></a>

Per utilizzare questo esempio di codice, devi:
+ Usa la [modalità di accesso alla rete pubblica](configuring-networking.md#webserver-options-public-network-onconsole) per il tuo ambiente [Amazon MWAA](get-started.md).
+ Disponi di una [funzione Lambda](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html) che utilizzi l'ultimo runtime di Python.

**Nota**  
Se la funzione Lambda e l'ambiente Amazon MWAA si trovano nello stesso VPC, puoi usare questo codice su una rete privata. Per questa configurazione, il ruolo di esecuzione della funzione Lambda richiede l'autorizzazione per chiamare l'operazione dell'API Amazon Elastic Compute Cloud (Amazon EC2). **CreateNetworkInterface** Puoi fornire questa autorizzazione utilizzando la policy -managed. [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS

## Permissions
<a name="samples-lambda-permissions"></a>

Per utilizzare l'esempio di codice in questa pagina, il ruolo di esecuzione del tuo ambiente Amazon MWAA deve avere accesso per eseguire l'azione`airflow:CreateCliToken`. Puoi fornire questa autorizzazione utilizzando la policy `AmazonMWAAAirflowCliAccess` AWS-managed:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Per ulteriori informazioni, vedi [Politica della CLI di Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dipendenze
<a name="samples-lambda-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2 e versioni successive, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-lambda-code"></a>

1. Apri la AWS Lambda console all'indirizzo. [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/)

1. Scegli la tua funzione Lambda dall'elenco **Funzioni**.

1. Nella pagina della funzione, copia il codice seguente e sostituiscilo con i nomi delle tue risorse:
   + `YOUR_ENVIRONMENT_NAME`— Il nome del tuo ambiente Amazon MWAA.
   + `YOUR_DAG_NAME`— Il nome del DAG che desideri richiamare.

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. Seleziona **Implementa**.

1. Scegli **Test** per richiamare la tua funzione utilizzando la console Lambda.

1. Per verificare che Lambda abbia richiamato correttamente il DAG, utilizza la console Amazon MWAA per accedere all'interfaccia utente Apache Airflow del tuo ambiente, quindi procedi come segue:

   1. Nella **DAGs**pagina, individua il nuovo DAG di destinazione nell'elenco di. DAGs

   1. In **Ultima esecuzione**, controllate il timestamp dell'ultima esecuzione del DAG. Questo timestamp deve corrispondere molto da vicino al timestamp più recente utilizzato nell'altro ambiente. `invoke_dag`

   1. In **Attività recenti**, verifica che l'ultima esecuzione sia andata a buon fine.

# Richiamo DAGs in diversi ambienti Amazon MWAA
<a name="samples-invoke-dag"></a>

Il seguente esempio di codice crea un token CLI Apache Airflow. Il codice utilizza quindi un grafo aciclico diretto (DAG) in un ambiente Amazon MWAA per richiamare un DAG in un altro ambiente Amazon MWAA.

**Topics**
+ [Versione](#samples-invoke-dag-version)
+ [Prerequisiti](#samples-invoke-dag-prereqs)
+ [Permissions](#samples-invoke-dag-permissions)
+ [Dipendenze](#samples-invoke-dag-dependencies)
+ [Esempio di codice](#samples-invoke-dag-code)

## Versione
<a name="samples-invoke-dag-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-invoke-dag-prereqs"></a>

Per utilizzare l'esempio di codice in questa pagina, è necessario quanto segue:
+ Due [ambienti Amazon MWAA](get-started.md) con accesso al server Web **di rete pubblica**, incluso l'ambiente corrente.
+ Un DAG di esempio caricato nel bucket Amazon Simple Storage Service (Amazon S3) dell'ambiente di destinazione.

## Permissions
<a name="samples-invoke-dag-permissions"></a>

Per utilizzare l'esempio di codice in questa pagina, il ruolo di esecuzione dell'ambiente deve disporre dell'autorizzazione per creare un token CLI Apache Airflow. È possibile allegare la policy AWS-managed `AmazonMWAAAirflowCliAccess` per concedere questa autorizzazione.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Per ulteriori informazioni, vedi [Politica della CLI di Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dipendenze
<a name="samples-invoke-dag-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2 e versioni successive, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-invoke-dag-code"></a>

Il seguente esempio di codice presuppone che stiate utilizzando un DAG nell'ambiente corrente per richiamare un DAG in un altro ambiente.

1. Nel terminale, accedete alla directory in cui è memorizzato il codice DAG. Esempio:

   ```
   cd dags
   ```

1. Copia il contenuto del seguente esempio di codice e salvalo localmente come`invoke_dag.py`. Sostituisci i seguenti valori con le tue informazioni.
   + `your-new-environment-name`— Il nome dell'altro ambiente in cui si desidera richiamare il DAG.
   + `your-target-dag-id`— L'ID del DAG nell'altro ambiente che si desidera richiamare.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se il DAG viene eseguito correttamente, nei log delle attività di verrà visualizzato un output simile al seguente. `invoke_dag_task`

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Per verificare che il DAG sia stato richiamato correttamente, accedi all'interfaccia utente di Apache Airflow per il tuo nuovo ambiente, quindi procedi come segue:

   1. Nella **DAGs**pagina, individua il nuovo DAG di destinazione nell'elenco di. DAGs

   1. In **Ultima esecuzione**, controllate il timestamp dell'ultima esecuzione del DAG. Questo timestamp deve corrispondere molto da vicino al timestamp più recente utilizzato nell'altro ambiente. `invoke_dag`

   1. In **Attività recenti**, verifica che l'ultima esecuzione sia andata a buon fine.

# Utilizzo di Amazon MWAA con Amazon RDS per Microsoft SQL Server
<a name="samples-sql-server"></a>

Puoi utilizzare Amazon Managed Workflows for Apache Airflow per connetterti a [un](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html) RDS per SQL Server. Il codice di esempio seguente viene utilizzato DAGs in un ambiente Amazon Managed Workflows for Apache Airflow per connettersi ed eseguire query su un Amazon RDS for Microsoft SQL Server.

**Topics**
+ [Versione](#samples-sql-server-version)
+ [Prerequisiti](#samples-sql-server-prereqs)
+ [Dipendenze](#samples-sql-server-dependencies)
+ [Connessione Apache Airflow v2](#samples-sql-server-conn)
+ [Esempio di codice](#samples-sql-server-code)
+ [Fasi successive](#samples-sql-server-next-up)

## Versione
<a name="samples-sql-server-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-sql-server-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).
+ Amazon MWAA e RDS per SQL Server sono in esecuzione nello stesso Amazon VPC/
+ I gruppi di sicurezza VPC di Amazon MWAA e del server sono configurati con le seguenti connessioni:
  + Una regola in entrata per la porta `1433` aperta per Amazon RDS nel gruppo di sicurezza di Amazon MWAA
  + Oppure una regola in uscita per la porta di `1433` apertura da Amazon MWAA a RDS
+ Apache Airflow Connection for RDS for SQL Server riflette il nome host, la porta, il nome utente e la password del database Amazon RDS SQL Server creato nel processo precedente.

## Dipendenze
<a name="samples-sql-server-dependencies"></a>

Per utilizzare il codice di esempio in questa sezione, aggiungi la seguente dipendenza al tuo. `requirements.txt` Per ulteriori informazioni, fare riferimento a. [Installazione delle dipendenze in Python](working-dags-dependencies.md)

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Connessione Apache Airflow v2
<a name="samples-sql-server-conn"></a>

Se utilizzi una connessione in Apache Airflow v2, assicurati che l'oggetto di connessione Airflow includa le seguenti coppie chiave-valore:

1. **ID** connessione: mssql\$1default

1. **Tipo di connessione:** Amazon Web Services

1. **Ospite:** `YOUR_DB_HOST`

1. **Schema:**

1. **Accesso:** admin

1. **Password:**

1. **Porta:** 1433

1. **Supplementare:**

## Esempio di codice
<a name="samples-sql-server-code"></a>

1. Nel prompt dei comandi, accedi alla directory in cui è memorizzato il codice DAG. Ad esempio:

   ```
   cd dags
   ```

1. Copia il contenuto del seguente esempio di codice e salvalo localmente come. `sql-server.py`

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## Fasi successive
<a name="samples-sql-server-next-up"></a>
+ Scopri come caricare il `requirements.txt` file in questo esempio nel tuo bucket Amazon S3 in. [Installazione delle dipendenze in Python](working-dags-dependencies.md)
+ Scopri come caricare il codice DAG in questo esempio nella `dags` cartella del tuo bucket Amazon S3 in. [Aggiungere o aggiornare DAGs](configuring-dag-folder.md)
+ [Esplora script di esempio e altri esempi di moduli pymssql.](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html)
+ *Scopri di più sull'esecuzione del codice SQL in uno specifico database Microsoft SQL utilizzando [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) nella guida di riferimento di Apache Airflow.*

# Utilizzo di Amazon MWAA con Amazon EKS
<a name="mwaa-eks-example"></a>

L'esempio seguente dimostra come utilizzare Amazon Managed Workflows per Apache Airflow con Amazon EKS.

**Topics**
+ [Versione](#mwaa-eks-example-version)
+ [Prerequisiti](#eksctl-prereqs)
+ [Crea una chiave pubblica per Amazon EC2](#eksctl-create-key)
+ [Creazione del cluster](#create-cluster-eksctl)
+ [Crea un namespace `mwaa`](#eksctl-namespace)
+ [Crea un ruolo per il namespace `mwaa`](#eksctl-role)
+ [Crea e collega un ruolo IAM per il cluster Amazon EKS](#eksctl-iam-role)
+ [Crea il file requirements.txt](#eksctl-requirements)
+ [Crea una mappatura delle identità per Amazon EKS](#eksctl-identity-map)
+ [Creazione del `kubeconfig`](#eksctl-kube-config)
+ [Create un DAG](#eksctl-create-dag)
+ [Aggiungi il DAG e `kube_config.yaml` al bucket Amazon S3](#eksctl-dag-bucket)
+ [Abilita e attiva l'esempio](#eksctl-trigger-pod)

## Versione
<a name="mwaa-eks-example-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="eksctl-prereqs"></a>

Per utilizzare l'esempio in questo argomento, è necessario quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).
+ eksctl. Per ulteriori informazioni, consulta [Installare](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl) eksctl.
+ kubectl. Per saperne di più, consulta [Installa e configura](https://kubernetes.io/docs/tasks/tools/install-kubectl/) kubectl. In alcuni casi questo viene installato con eksctl.
+ Una coppia di chiavi EC2 nella regione in cui crei il tuo ambiente Amazon MWAA. Per ulteriori informazioni, consulta [Creazione o importazione di una coppia di key pair](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair).

**Nota**  
Quando si utilizza un `eksctl` comando, è possibile includere un `--profile` per specificare un profilo diverso da quello predefinito.

## Crea una chiave pubblica per Amazon EC2
<a name="eksctl-create-key"></a>

Usa il seguente comando per creare una chiave pubblica dalla tua coppia di chiavi private.

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

Per maggiori informazioni, consulta [Recupero della chiave pubblica per la tua coppia di chiavi](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key).

## Creazione del cluster
<a name="create-cluster-eksctl"></a>

Usa il comando seguente per creare il cluster. Se desideri un nome personalizzato per il cluster o crearlo in una regione diversa, sostituisci i valori del nome e della regione. È necessario creare il cluster nella stessa regione in cui si crea l'ambiente Amazon MWAA. Sostituisci i valori delle sottoreti in modo che corrispondano alle sottoreti della tua rete Amazon VPC che usi per Amazon MWAA. Sostituisci il valore in modo che corrisponda alla `ssh-public-key` chiave che usi. Puoi utilizzare una chiave esistente di Amazon EC2 che si trova nella stessa regione o creare una nuova chiave nella stessa regione in cui crei il tuo ambiente Amazon MWAA.

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

Il completamento della creazione del cluster richiede del tempo. Una volta completata, puoi verificare che il cluster sia stato creato correttamente e che il provider IAM OIDC sia configurato utilizzando il seguente comando:

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## Crea un namespace `mwaa`
<a name="eksctl-namespace"></a>

Dopo aver verificato che il cluster è stato creato correttamente, utilizzate il seguente comando per creare uno spazio dei nomi per i pod.

```
kubectl create namespace mwaa
```

## Crea un ruolo per il namespace `mwaa`
<a name="eksctl-role"></a>

Dopo aver creato lo spazio dei nomi, crea un ruolo e un'associazione di ruoli per un utente Amazon MWAA su EKS che può eseguire pod in uno spazio dei nomi MWAA. Se hai usato un nome diverso per lo spazio dei nomi, sostituisci mwaa con il nome che hai usato. `-n mwaa`

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

Verifica che il nuovo ruolo possa accedere al cluster Amazon EKS eseguendo il comando seguente. Assicurati di utilizzare il nome corretto se non hai usato*mwaa*:

```
kubectl get pods -n mwaa --as mwaa-service
```

Viene restituito un messaggio che dice:

```
No resources found in mwaa namespace.
```

## Crea e collega un ruolo IAM per il cluster Amazon EKS
<a name="eksctl-iam-role"></a>

È necessario creare un ruolo IAM e quindi associarlo al cluster Amazon EKS (k8s) in modo che possa essere utilizzato per l'autenticazione tramite IAM. Il ruolo viene utilizzato solo per accedere al cluster e non dispone di autorizzazioni per la console o le chiamate API.

Crea un nuovo ruolo per l'ambiente Amazon MWAA utilizzando i passaggi seguenti. [Ruolo di esecuzione di Amazon MWAA](mwaa-create-role.md) Tuttavia, anziché creare e allegare le politiche descritte in quell'argomento, allega la seguente politica:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

Dopo aver creato il ruolo, modifica l'ambiente Amazon MWAA per utilizzare il ruolo creato come ruolo di esecuzione per l'ambiente. Per modificare il ruolo, modifica l'ambiente da utilizzare. È possibile selezionare il ruolo di esecuzione in **Autorizzazioni**.

**Problemi noti:**
+ Esiste un problema noto relativo al ruolo ARNs con i subpath che non sono in grado di autenticarsi con Amazon EKS. La soluzione alternativa consiste nel creare il ruolo di servizio manualmente anziché utilizzare quello creato dallo stesso Amazon MWAA. Per saperne di più, consulta [Ruoli con percorsi non funzionano quando il percorso è incluso nel loro ARN nella configmap di aws-auth](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ Se l'elenco dei servizi Amazon MWAA non è disponibile in IAM, devi scegliere una politica di servizio alternativa, come Amazon EC2, e quindi aggiornare la politica di fiducia del ruolo in modo che corrisponda a quanto segue:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  Per ulteriori informazioni, consulta [Come utilizzare le politiche di fiducia con i ruoli](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/) IAM.

## Crea il file requirements.txt
<a name="eksctl-requirements"></a>

Per utilizzare il codice di esempio in questa sezione, assicurati di aver aggiunto una delle seguenti opzioni di database al tuo`requirements.txt`. Per ulteriori informazioni, consulta[Installazione delle dipendenze in Python](working-dags-dependencies.md).

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Crea una mappatura delle identità per Amazon EKS
<a name="eksctl-identity-map"></a>

Usa l'ARN per il ruolo che hai creato nel seguente comando per creare una mappatura delle identità per Amazon EKS. Cambia la regione nella *us-east-1* regione in cui hai creato l'ambiente. Sostituisci l'ARN per il ruolo e, infine, sostituiscilo *mwaa-execution-role* con il ruolo di esecuzione del tuo ambiente.

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## Creazione del `kubeconfig`
<a name="eksctl-kube-config"></a>

Utilizzate il seguente comando per creare: `kubeconfig`

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

Se hai utilizzato un profilo specifico durante l'esecuzione, `update-kubeconfig` devi rimuovere la `env:` sezione aggiunta al file kube\$1config.yaml in modo che funzioni correttamente con Amazon MWAA. A tale scopo, elimina quanto segue dal file e poi salvalo:

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## Create un DAG
<a name="eksctl-create-dag"></a>

Usa il seguente esempio di codice per creare un file Python, ad esempio `mwaa_pod_example.py` per il DAG.

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## Aggiungi il DAG e `kube_config.yaml` al bucket Amazon S3
<a name="eksctl-dag-bucket"></a>

Inserisci il DAG che hai creato e il `kube_config.yaml` file nel bucket Amazon S3 per l'ambiente Amazon MWAA. Puoi inserire file nel tuo bucket utilizzando la console Amazon S3 o il. AWS Command Line Interface

## Abilita e attiva l'esempio
<a name="eksctl-trigger-pod"></a>

In Apache Airflow, abilita l'esempio e poi attivalo.

Dopo che è stato eseguito e completato correttamente, utilizzate il seguente comando per verificare il pod:

```
kubectl get pods -n mwaa
```

Otterrete un risultato simile al seguente:

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

È quindi possibile verificare l'output del pod con il seguente comando. Sostituisci il valore del nome con il valore restituito dal comando precedente:

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# Connessione ad Amazon ECS tramite `ECSOperator`
<a name="samples-ecs-operator"></a>

Questo argomento descrive come utilizzare il `ECSOperator` per connettersi a un container Amazon Elastic Container Service (Amazon ECS) da Amazon MWAA. Nei passaggi seguenti, aggiungerai le autorizzazioni necessarie al ruolo di esecuzione del tuo ambiente, utilizzerai un CloudFormation modello per creare un cluster Amazon ECS Fargate e infine creerai e caricherai un DAG che si connette al tuo nuovo cluster.

**Topics**
+ [Versione](#samples-ecs-operator-version)
+ [Prerequisiti](#samples-ecs-operator-prereqs)
+ [Permissions](#samples-ecs-operator-permissions)
+ [Crea un cluster Amazon ECS](#create-cfn-template)
+ [Esempio di codice](#samples-ecs-operator-code)

## Versione
<a name="samples-ecs-operator-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-ecs-operator-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Permissions
<a name="samples-ecs-operator-permissions"></a>
+ Il ruolo di esecuzione per il tuo ambiente richiede l'autorizzazione per eseguire attività in Amazon ECS. Puoi allegare la politica FullAccess AWS gestita da [AmazonECS\$1](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) al tuo ruolo di esecuzione oppure creare e allegare la seguente politica al tuo ruolo di esecuzione.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ Oltre ad aggiungere le autorizzazioni necessarie per eseguire attività in Amazon ECS, devi anche modificare l'informativa sulla politica CloudWatch Logs nel tuo ruolo di esecuzione Amazon MWAA per consentire l'accesso al gruppo di log di attività di Amazon ECS come elencato di seguito. Il gruppo di log di Amazon ECS viene creato dal CloudFormation modello in[Crea un cluster Amazon ECS](#create-cfn-template).

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

Per ulteriori informazioni sul ruolo di esecuzione di Amazon MWAA e su come allegare una policy, consulta. [Ruolo di esecuzione](mwaa-create-role.md)

## Crea un cluster Amazon ECS
<a name="create-cfn-template"></a>

Utilizzando il CloudFormation modello seguente, creerai un cluster Amazon ECS Fargate da utilizzare con il tuo flusso di lavoro Amazon MWAA. Per ulteriori informazioni, consulta [Creating a task definition](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition) nella *Amazon Elastic Container Service Developer Guide*.

1. Crea un file JSON con il codice seguente e salvalo con nome. `ecs-mwaa-cfn.json`

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. Nel prompt dei comandi, usa il seguente AWS CLI comando per creare un nuovo stack. Devi sostituire i valori `SecurityGroups` e `SubnetIds` con i valori per i gruppi di sicurezza e le sottoreti del tuo ambiente Amazon MWAA.

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   In alternativa, puoi utilizzare il seguente script di shell. Lo script recupera i valori richiesti per i gruppi di sicurezza e le sottoreti dell'ambiente utilizzando il `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI comando, quindi crea lo stack di conseguenza. Per eseguire lo script, procedi come segue.

   1. Copiate e salvate lo script `ecs-stack-helper.sh` nella stessa directory del CloudFormation modello.

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. Esegui lo script utilizzando i seguenti comandi. Sostituisci `environment-name` e `stack-name` con le tue informazioni.

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   In caso di successo, farai riferimento al seguente output che mostra il nuovo ID CloudFormation dello stack.

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

Una volta completato CloudFormation lo stack e AWS aver effettuato il provisioning delle risorse Amazon ECS, sei pronto per creare e caricare il tuo DAG.

## Esempio di codice
<a name="samples-ecs-operator-code"></a>

1. Apri un prompt dei comandi e accedi alla directory in cui è archiviato il codice DAG. Esempio:

   ```
   cd dags
   ```

1. Copia il contenuto del seguente esempio di codice e salvalo localmente con nome`mwaa-ecs-operator.py`, quindi carica il nuovo DAG su Amazon S3.

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**Nota**  
Nell'esempio DAG, for`awslogs_group`, potrebbe essere necessario modificare il gruppo di log con il nome del gruppo di task log di Amazon ECS. L'esempio presuppone un gruppo di log denominato. `mwaa-ecs-zero` Per`awslogs_stream_prefix`, usa il prefisso del flusso del task log di Amazon ECS. L'esempio presuppone un prefisso del flusso di log,. `ecs`

1.  Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. In caso di successo, otterrete un output simile al seguente nei log delle attività del DAG: `ecs_operator_task` `ecs_fargate_dag`

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Usare dbt con Amazon MWAA
<a name="samples-dbt"></a>

Questo argomento dimostra come utilizzare dbt e Postgres con Amazon MWAA. Nei passaggi seguenti, aggiungerai le dipendenze richieste al tuo `requirements.txt` e caricherai un progetto dbt di esempio nel bucket Amazon S3 del tuo ambiente. Quindi, utilizzerai un DAG di esempio per verificare che Amazon MWAA abbia installato le dipendenze e infine utilizzerai il `BashOperator` per eseguire il progetto dbt.

**Topics**
+ [Versione](#samples-dbt-version)
+ [Prerequisiti](#samples-dbt-prereqs)
+ [Dipendenze](#samples-dbt-dependencies)
+ [Caricare un progetto dbt su Amazon S3](#samples-dbt-upload-project)
+ [Utilizzate un DAG per verificare l'installazione della dipendenza da dbt](#samples-dbt-test-dependencies)
+ [Usa un DAG per eseguire un progetto dbt](#samples-dbt-run-project)

## Versione
<a name="samples-dbt-version"></a>

**[È possibile utilizzare l'esempio di codice in questa pagina con **Apache Airflow v2 in Python 3.10 e Apache Airflow v3**[in Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prerequisiti
<a name="samples-dbt-prereqs"></a>

Prima di poter completare i seguenti passaggi, avrai bisogno di quanto segue:
+ Un [ambiente Amazon MWAA che utilizza Apache](get-started.md) Airflow v2.2.2. Questo esempio è stato scritto e testato con la versione 2.2.2. Potrebbe essere necessario modificare l'esempio per utilizzarlo con altre versioni di Apache Airflow.
+ Un esempio di progetto dbt. Per iniziare a usare dbt con Amazon MWAA, puoi creare un fork e clonare il progetto [dbt starter dal repository dbt-labs](https://github.com/dbt-labs/dbt-starter-project). GitHub 

## Dipendenze
<a name="samples-dbt-dependencies"></a>

Per usare Amazon MWAA con dbt, aggiungi il seguente script di avvio al tuo ambiente. Per ulteriori informazioni, consulta [Usare uno script di avvio con Amazon MWAA](using-startup-script.md).

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

Nelle sezioni seguenti, caricherai la tua directory di progetto dbt su Amazon S3 ed eseguirai un DAG che convalida se Amazon MWAA ha installato correttamente le dipendenze dbt richieste.

## Caricare un progetto dbt su Amazon S3
<a name="samples-dbt-upload-project"></a>

Per poter utilizzare un progetto dbt con il tuo ambiente Amazon MWAA, puoi caricare l'intera directory del progetto nella cartella del tuo ambiente. `dags` Quando l'ambiente si aggiorna, Amazon MWAA scarica la directory dbt nella cartella locale. `usr/local/airflow/dags/`

**Per caricare un progetto dbt su Amazon S3**

1. Passa alla directory in cui hai clonato il progetto dbt starter.

1. Esegui il seguente AWS CLI comando Amazon S3 per copiare in modo ricorsivo il contenuto del progetto `dags` nella cartella del tuo ambiente utilizzando il parametro. `--recursive` Il comando crea una sottodirectory chiamata `dbt` che puoi usare per tutti i tuoi progetti dbt. Se la sottodirectory esiste già, i file di progetto vengono copiati nella directory esistente e non viene creata una nuova directory. Il comando crea anche una sottodirectory all'interno della `dbt` directory per questo specifico progetto iniziale.

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   È possibile utilizzare nomi diversi per le sottodirectory del progetto per organizzare più progetti dbt all'interno della directory principale. `dbt`

## Utilizzate un DAG per verificare l'installazione della dipendenza da dbt
<a name="samples-dbt-test-dependencies"></a>

Il seguente DAG utilizza un comando `BashOperator` e un comando bash per verificare se Amazon MWAA ha installato correttamente le dipendenze dbt specificate in. `requirements.txt`

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

Effettua le seguenti operazioni per accedere ai log delle attività e verificare che dbt e le sue dipendenze siano stati installati.

1. Accedi alla console Amazon MWAA, quindi scegli **Open Airflow UI** dall'elenco degli ambienti disponibili.

1. Nell'interfaccia utente di Apache Airflow, trova `dbt-installation-test` il DAG dall'elenco, quindi scegli la data nella colonna per aprire `Last Run` l'ultima attività riuscita.

1. Utilizzando **Graph View**, scegliete l'`bash_command`attività per aprire i dettagli dell'istanza dell'operazione.

1. Scegliete **Log** per aprire i log delle attività, quindi verificate che i log elencino correttamente la versione dbt in cui è stata specificata. `requirements.txt`

## Usa un DAG per eseguire un progetto dbt
<a name="samples-dbt-run-project"></a>

Il seguente DAG utilizza `BashOperator` a per copiare i progetti dbt caricati su Amazon S3 dalla directory `usr/local/airflow/dags/` locale alla directory `/tmp` accessibile in scrittura, quindi esegue il progetto dbt. I comandi bash presuppongono un progetto dbt iniziale intitolato. `dbt-starter-project` Modifica il nome della directory in base al nome della directory del tuo progetto.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS blog e tutorial
<a name="samples-blogs-tutorials"></a>
+ [Utilizzo di Amazon EKS e Amazon MWAA per Apache Airflow v2.x](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)