

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Códigos de ejemplo de Amazon Managed Workflows para Apache Airflow
<a name="sample-code"></a>

Esta guía contiene ejemplos de código, incluidos DAGs complementos personalizados, que puede utilizar en un entorno de Amazon Managed Workflows para Apache Airflow. Para ver más ejemplos del uso de Apache Airflow con AWS servicios, consulte el [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags)directorio del repositorio de Apache GitHub Airflow.

**Topics**
+ [Uso de un DAG para importar variables en la CLI](samples-variables-import.md)
+ [Creación de una conexión SSH mediante `SSHOperator`](samples-ssh.md)
+ [Uso de una clave secreta en AWS Secrets Manager para una conexión de Apache Airflow Snowflake](samples-sm-snowflake.md)
+ [Uso de un DAG para escribir métricas personalizadas en CloudWatch](samples-custom-metrics.md)
+ [Limpieza de bases de datos de Aurora PostgreSQL en un entorno de Amazon MWAA](samples-database-cleanup.md)
+ [Exportación de metadatos del entorno a archivos CSV en Amazon S3](samples-dag-run-info-to-csv.md)
+ [Uso de una clave secreta en AWS Secrets Manager para una variable de Apache Airflow](samples-secrets-manager-var.md)
+ [Uso de una clave secreta en AWS Secrets Manager para una conexión de Apache Airflow](samples-secrets-manager.md)
+ [Creación de un complemento personalizado con Oracle](samples-oracle.md)
+ [Cómo cambiar la zona horaria de un DAG en Amazon MWAA](samples-plugins-timezone.md)
+ [Actualización de un token de CodeArtifact](samples-code-artifact.md)
+ [Creación de un complemento personalizado con Apache Hive y Hadoop](samples-hive.md)
+ [Creación de un complemento personalizado para Apache Airflow PythonVirtualEnvOperator](samples-virtualenv.md)
+ [Invocar DAGs con una función Lambda](samples-lambda.md)
+ [Invocación DAGs en diferentes entornos de Amazon MWAA](samples-invoke-dag.md)
+ [Uso de Amazon MWAA con Amazon RDS para Microsoft SQL Server](samples-sql-server.md)
+ [Uso de imágenes de Amazon ECR con Amazon EKS](mwaa-eks-example.md)
+ [Conexión a Amazon ECS mediante el `ECSOperator`](samples-ecs-operator.md)
+ [Uso de dbt con Amazon MWAA](samples-dbt.md)
+ [AWS blogs y tutoriales](#samples-blogs-tutorials)

# Uso de un DAG para importar variables en la CLI
<a name="samples-variables-import"></a>

La siguiente muestra de código importa variables mediante la CLI de Amazon Managed Workflows para Apache Airflow.

**Topics**
+ [Versión](#samples-variables-import-version)
+ [Requisitos previos](#samples-variables-import-prereqs)
+ [Permisos](#samples-variables-import-permissions)
+ [Dependencias](#samples-variables-import-dependencies)
+ [Código de ejemplo](#samples-variables-import-code)
+ [Siguientes pasos](#samples-variables-import-next-up)

## Versión
<a name="samples-variables-import-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-variables-import-prereqs"></a>

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

## Permisos
<a name="samples-variables-import-permissions"></a>

Su Cuenta de AWS necesita acceso a la política `AmazonMWAAAirflowCliAccess`. Consulte [Política de CLI de Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md) para obtener más información.

## Dependencias
<a name="samples-variables-import-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) para instalar Apache Airflow.

## Código de ejemplo
<a name="samples-variables-import-code"></a>

En la siguiente muestra de código, se requieren tres entradas: el nombre de su entorno de Amazon MWAA (en `mwaa_env`), la Región de AWS de su entorno (en `aws_region`) y el archivo local que contiene las variables que desea importar (en `var_file`).

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## Siguientes pasos
<a name="samples-variables-import-next-up"></a>
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).

# Creación de una conexión SSH mediante `SSHOperator`
<a name="samples-ssh"></a>

El siguiente ejemplo describe cómo puede utilizar el `SSHOperator` en un gráfico acíclico dirigido (DAG) para conectarse a una instancia remota de Amazon EC2 desde su entorno Amazon Managed Workflows para Apache Airflow. Puede utilizar un enfoque similar para conectarse a cualquier instancia remota con acceso SSH.

En el siguiente ejemplo, se cargará una clave secreta SSH (`.pem`) en el directorio `dags` de su entorno en Amazon S3. A continuación, instale las dependencias necesarias mediante `requirements.txt` y cree una nueva conexión de Apache Airflow en la interfaz de usuario. Por último, escribe un DAG que crea una conexión SSH con la instancia remota.

**Topics**
+ [Versión](#samples-ssh-version)
+ [Requisitos previos](#samples-ssh-prereqs)
+ [Permisos](#samples-ssh-permissions)
+ [Requisitos](#samples-ssh-dependencies)
+ [Cómo copiar su clave secreta en Amazon S3](#samples-ssh-secret)
+ [Creación de una nueva conexión con Apache Airflow](#samples-ssh-connection)
+ [Código de ejemplo](#samples-ssh-code)

## Versión
<a name="samples-ssh-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-ssh-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).
+ Una clave secreta de SSH. En el código de muestra se supone que tiene una instancia de Amazon EC2 y una `.pem` en la misma región que su entorno de Amazon MWAA. Si no tiene una clave SSH, consulte [Crear o importar un par de claves](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) en la *guía del usuario de Amazon EC2*.

## Permisos
<a name="samples-ssh-permissions"></a>

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

## Requisitos
<a name="samples-ssh-dependencies"></a>

Añada el siguiente parámetro a `requirements.txt` para instalar el paquete `apache-airflow-providers-ssh` en el servidor web. Una vez que su entorno se actualice y Amazon MWAA instale correctamente la dependencia, verá un nuevo tipo de conexión **SSH** en la UI.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**nota**  
`-c` define la URL de las restricciones en `requirements.txt`. Esto garantiza que Amazon MWAA instale la versión de paquete correcta para su entorno.

## Cómo copiar su clave secreta en Amazon S3
<a name="samples-ssh-secret"></a>

Utilice el siguiente comando de la AWS Command Line Interface para copiar la clave `.pem` en el directorio `dags` de su entorno en Amazon S3.

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA copia el contenido en `dags`, incluida la clave `.pem`, en el directorio local `/usr/local/airflow/dags/`. De este modo, Apache Airflow puede acceder a la clave.

## Creación de una nueva conexión con Apache Airflow
<a name="samples-ssh-connection"></a>

**Creación de una nueva conexión SSH mediante la interfaz de usuario de Apache Airflow**

1. Abra la página [Entornos](https://console.aws.amazon.com/mwaa/home#/environments) en la consola de Amazon MWAA.

1. En la lista de entornos, elija **Abrir la interfaz de usuario de Airflow** para su entorno.

1. En la página de UI de Apache Airflow, seleccione **Admin** (Administrador) en la barra de navegación superior para ampliar la lista desplegable y, a continuación, seleccione **Connections** (Conexiones).

1. En la página **Listar conexiones**, seleccione **\$1** o el botón **Añadir un nuevo registro** para añadir una nueva conexión.

1. En la página **Conectar a AD**, proporcione la siguiente información:

   1. En **Nombre de conexión** introduzca **ssh\$1new**.

   1. Para el **tipo de conexión**, seleccione **SSH** en la lista desplegable.
**nota**  
Si el tipo de conexión **SSH** no está disponible en la lista, Amazon MWAA no ha instalado el paquete `apache-airflow-providers-ssh` necesario. Actualice el archivo `requirements.txt` para incluir este paquete e inténtelo de nuevo.

   1. Para **Host**, introduzca la dirección IP de la instancia de Amazon EC2 a la que desee conectarse. Por ejemplo, **12.345.67.89**.

   1. En **Nombre de usuario**, introduzca **ec2-user** si se está conectando a una instancia de Amazon EC2. Su nombre de usuario puede ser diferente, según el tipo de instancia remota a la que desee que se conecte Apache Airflow.

   1. Para **Extra**, introduzca el siguiente par clave-valor en formato JSON:

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      Este par clave-valor indica a Apache Airflow que busque la clave secreta en el directorio local `/dags`.

## Código de ejemplo
<a name="samples-ssh-code"></a>

El siguiente DAG utiliza `SSHOperator` para conectarse a la instancia de Amazon EC2 de destino y, a continuación, ejecuta el comando de Linux `hostname` para imprimir el nombre de la instancia. Puede modificar el DAG para ejecutar cualquier comando o script en la instancia remota.

1. Abra una terminal y navegue hasta el directorio en el que está almacenado el código del DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `ssh.py`.

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  Ejecute el siguiente comando de la AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas del `ssh_task` en el DAG `ssh_operator_example`:

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Uso de una clave secreta en AWS Secrets Manager para una conexión de Apache Airflow Snowflake
<a name="samples-sm-snowflake"></a>

En el siguiente ejemplo, se llama a AWS Secrets Manager para obtener una clave secreta para una conexión de Apache Airflow Snowflake en Amazon Managed Workflows para Apache Airflow. Se asume que ha realizado los pasos que se detallan en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Versión](#samples-sm-snowflake-version)
+ [Requisitos previos](#samples-sm-snowflake-prereqs)
+ [Permisos](#samples-sm-snowflake-permissions)
+ [Requisitos](#samples-sm-snowflake-dependencies)
+ [Código de ejemplo](#samples-sm-snowflake-code)
+ [Siguientes pasos](#samples-sm-snowflake-next-up)

## Versión
<a name="samples-sm-snowflake-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-sm-snowflake-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ El backend de Secrets Manager como opción de configuración de Apache Airflow, como se muestra en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).
+ Una cadena de conexión de Apache Airflow en Secrets Manager, como aparece en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

## Permisos
<a name="samples-sm-snowflake-permissions"></a>
+ Permisos de Secrets Manager, como se muestra en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

## Requisitos
<a name="samples-sm-snowflake-dependencies"></a>

Para usar el código de ejemplo de esta página, agregue las siguientes dependencias a su `requirements.txt`. Consulte [Instalación de dependencias de Python](working-dags-dependencies.md) para obtener más información.

```
apache-airflow-providers-snowflake==1.3.0
```

## Código de ejemplo
<a name="samples-sm-snowflake-code"></a>

En los siguientes pasos se describe cómo crear el código DAG que llama a Secrets Manager para recibir el secreto.

1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `snowflake_connection.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## Siguientes pasos
<a name="samples-sm-snowflake-next-up"></a>
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).

# Uso de un DAG para escribir métricas personalizadas en CloudWatch
<a name="samples-custom-metrics"></a>

Puede usar el siguiente código de ejemplo para escribir un gráfico acíclico dirigido (DAG) que ejecute un `PythonOperator` para recuperar métricas a nivel de sistema operativo para un entorno de Amazon MWAA. A continuación, el DAG publica los datos como métricas personalizadas en Amazon CloudWatch.

Las métricas personalizadas a nivel del sistema operativo proporcionan una visibilidad adicional sobre la forma en que los procesos de trabajo de su entorno utilizan recursos como la memoria virtual y la CPU. Puede utilizar esta información para seleccionar la [clase de entorno](environment-class.md) que mejor se adapte a su carga de trabajo.

**Topics**
+ [Versión](#samples-custom-metrics-version)
+ [Requisitos previos](#samples-custom-metrics-prereqs)
+ [Permisos](#samples-custom-metrics-permissions)
+ [Dependencias](#samples-custom-metrics-dependencies)
+ [Ejemplo de código](#samples-custom-metrics-code)

## Versión
<a name="samples-custom-metrics-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-custom-metrics-prereqs"></a>

Para usar el ejemplo de código en esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).

## Permisos
<a name="samples-custom-metrics-permissions"></a>

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

## Dependencias
<a name="samples-custom-metrics-dependencies"></a>
+ No se necesitan dependencias adicionales para usar el código de ejemplo de esta página.

## Ejemplo de código
<a name="samples-custom-metrics-code"></a>

1. En el símbolo del sistema, vaya hasta la carpeta en la que se encuentra almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como `dag-custom-metrics.py`. Sustituya `MWAA-ENV-NAME` por el nombre de su entorno.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  Ejecute el siguiente comando de la AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si el DAG se ejecuta correctamente, debería aparecer algo similar a lo siguiente en los registros de Apache Airflow:

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Limpieza de bases de datos de Aurora PostgreSQL en un entorno de Amazon MWAA
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows para Apache Airflow utiliza una base de datos Aurora PostgreSQL como base de datos de metadatos de Apache Airflow, donde se ejecuta el DAG y se almacenan las instancias de tareas. La siguiente muestra de código borra periódicamente las entradas de la base de datos Aurora PostgreSQL dedicada a su entorno Amazon MWAA.

**Topics**
+ [Versión](#samples-database-cleanup-version)
+ [Requisitos previos](#samples-database-cleanup-prereqs)
+ [Dependencias](#samples-sql-server-dependencies)
+ [Código de ejemplo](#samples-database-cleanup-code)

## Versión
<a name="samples-database-cleanup-version"></a>

Los ejemplos de código de esta página son específicos de Apache Airflow v2, compatible con Amazon MWAA. Consulte las versiones de [Apache Airflow compatibles](airflow-versions.md).

**sugerencia**  
**Para los usuarios de Apache Airflow v3**: si desea limpiar una base de datos (purgar los registros antiguos de las tablas del almacén de metadatos), ejecute el comando `db clean` en la CLI.

## Requisitos previos
<a name="samples-database-cleanup-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).

## Dependencias
<a name="samples-sql-server-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2, no se necesitan dependencias adicionales. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)Utilícelo para instalar Apache Airflow.

## Código de ejemplo
<a name="samples-database-cleanup-code"></a>

El siguiente DAG limpia la base de datos de metadatos de las tablas especificadas en `TABLES_TO_CLEAN`. En el ejemplo, se eliminan los datos de las tablas especificadas con más de 30 días de antigüedad. Para ajustar el tiempo de eliminación de las entradas, establezca `MAX_AGE_IN_DAYS` en un valor diferente.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Exportación de metadatos del entorno a archivos CSV en Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Use el siguiente código de muestra para crear un gráfico acíclico dirigido (DAG) que consulte en la base de datos un rango de información de ejecución del DAG y escriba los datos en los archivos `.csv` almacenados en Amazon S3.

Es posible que deba exportar información de la base de datos Aurora PostgreSQL de su entorno para inspeccionar los datos localmente, archivarlos en un almacenamiento de objetos o combinarlos con herramientas como el [operador Amazon S3 a Amazon Redshift](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) y la [limpieza de la base de datos](samples-database-cleanup.md), a fin de sacar los metadatos de Amazon MWAA del entorno y conservarlos para futuros análisis.

Puede consultar en la base de datos cualquiera de los objetos enumerados en los [modelos de Apache Airflow.](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models) En este código de muestra, se usan tres modelos, `DagRun`, `TaskFail` y `TaskInstance`, que proporcionan información relevante para las ejecuciones del DAG.

**Topics**
+ [Versión](#samples-dag-run-info-to-csv-version)
+ [Requisitos previos](#samples-dag-run-info-to-csv-prereqs)
+ [Permisos](#samples-dag-run-info-to-csv-permissions)
+ [Requisitos](#samples-dag-run-info-to-csv-dependencies)
+ [Código de ejemplo](#samples-dag-run-info-to-csv-code)

## Versión
<a name="samples-dag-run-info-to-csv-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).
+ Un [nuevo bucket de Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) a donde desea exportar su información de metadatos.

## Permisos
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA necesita permiso para que la acción `s3:PutObject` de Amazon S3 escriba la información de metadatos consultada en Amazon S3. Añada la siguiente instrucción de política al rol de ejecución de su entorno.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Esta política limita el acceso por escrito únicamente a*amzn-s3-demo-bucket*.

## Requisitos
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)Utilícelo para instalar Apache Airflow.

## Código de ejemplo
<a name="samples-dag-run-info-to-csv-code"></a>

Los siguientes pasos describen cómo puede crear un DAG que consulte Aurora PostgreSQL y escriba el resultado en su nuevo bucket de Amazon S3.

1. En su terminal, vaya hasta el directorio en el que está almacenado el código de DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como `metadata_to_csv.py`. Puede cambiar el valor asignado `MAX_AGE_IN_DAYS` para controlar la antigüedad de los registros más antiguos que el DAG consulta en la base de datos de metadatos.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Ejecute el siguiente AWS CLI comando para copiar el DAG al bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas para la tarea `export_db`:

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Ahora puede acceder a los archivos `.csv` exportados y descargarlos en su nuevo bucket de Amazon S3 en `/files/export/`.

# Uso de una clave secreta en AWS Secrets Manager para una variable de Apache Airflow
<a name="samples-secrets-manager-var"></a>

El siguiente ejemplo, llama a AWS Secrets Manager para obtener una clave secreta para una variable de Apache Airflow en Amazon Managed Workflows para Apache Airflow. Se asume que ha realizado los pasos que se detallan en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Versión](#samples-secrets-manager-var-version)
+ [Requisitos previos](#samples-secrets-manager-var-prereqs)
+ [Permisos](#samples-secrets-manager-var-permissions)
+ [Requisitos](#samples-hive-dependencies)
+ [Código de ejemplo](#samples-secrets-manager-var-code)
+ [Siguientes pasos](#samples-secrets-manager-var-next-up)

## Versión
<a name="samples-secrets-manager-var-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-secrets-manager-var-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ El backend de Secrets Manager como opción de configuración de Apache Airflow, como se muestra en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).
+ Una cadena de variables de Apache Airflow en Secrets Manager, como se muestra en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

## Permisos
<a name="samples-secrets-manager-var-permissions"></a>
+ Permisos de Secrets Manager, como se muestra en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

## Requisitos
<a name="samples-hive-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) para instalar Apache Airflow.

## Código de ejemplo
<a name="samples-secrets-manager-var-code"></a>

En los siguientes pasos se describe cómo crear el código DAG que llama a Secrets Manager para recibir el secreto.

1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `secrets-manager-var.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## Siguientes pasos
<a name="samples-secrets-manager-var-next-up"></a>
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).

# Uso de una clave secreta en AWS Secrets Manager para una conexión de Apache Airflow
<a name="samples-secrets-manager"></a>

El siguiente ejemplo, se llama a AWS Secrets Manager para obtener una clave secreta para una conexión de Apache Airflow en Amazon Managed Workflows para Apache Airflow. Se asume que ha realizado los pasos que se detallan en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

**Topics**
+ [Versión](#samples-secrets-manager-version)
+ [Requisitos previos](#samples-secrets-manager-prereqs)
+ [Permisos](#samples-secrets-manager-permissions)
+ [Requisitos](#samples-hive-dependencies)
+ [Código de ejemplo](#samples-secrets-manager-code)
+ [Siguientes pasos](#samples-secrets-manager-next-up)

## Versión
<a name="samples-secrets-manager-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-secrets-manager-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ El backend de Secrets Manager como opción de configuración de Apache Airflow, como se muestra en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).
+ Una cadena de conexión de Apache Airflow en Secrets Manager, como aparece en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

## Permisos
<a name="samples-secrets-manager-permissions"></a>
+ Permisos de Secrets Manager, como se muestra en [Configuración de una conexión de Apache Airflow mediante un secreto AWS Secrets Manager](connections-secrets-manager.md).

## Requisitos
<a name="samples-hive-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) para instalar Apache Airflow.

## Código de ejemplo
<a name="samples-secrets-manager-code"></a>

En los siguientes pasos se describe cómo crear el código DAG que llama a Secrets Manager para recibir el secreto.

1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `secrets-manager.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## Siguientes pasos
<a name="samples-secrets-manager-next-up"></a>
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).

# Creación de un complemento personalizado con Oracle
<a name="samples-oracle"></a>

En el siguiente ejemplo, se explican los pasos necesarios para crear un complemento personalizado con Oracle para Amazon MWAA y se puede combinar con otros complementos y binarios personalizados en el archivo plugins.zip.

**Contents**
+ [Versión](#samples-oracle-version)
+ [Requisitos previos](#samples-oracle-prereqs)
+ [Permisos](#samples-oracle-permissions)
+ [Requisitos](#samples-oracle-dependencies)
+ [Código de ejemplo](#samples-oracle-code)
+ [Creación del complemento personalizado](#samples-oracle-create-pluginszip-steps)
  + [Descarga de dependencias](#samples-oracle-install)
  + [Complemento personalizado](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Opciones de configuración de Airflow](#samples-oracle-airflow-config)
+ [Siguientes pasos](#samples-oracle-next-up)

## Versión
<a name="samples-oracle-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-oracle-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).
+ El registro de procesos de trabajo habilitado en cualquier nivel de registro, `CRITICAL` o en la sección anterior a su entorno. Para obtener más información sobre los tipos de registros de Amazon MWAA y sobre cómo administrar sus grupos de registros, consulte [Acceder a los registros de Airflow en Amazon CloudWatch](monitoring-airflow.md)

## Permisos
<a name="samples-oracle-permissions"></a>

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

## Requisitos
<a name="samples-oracle-dependencies"></a>

Para usar el código de ejemplo de esta página, agregue las siguientes dependencias a su `requirements.txt`. Consulte [Instalación de dependencias de Python](working-dags-dependencies.md) para obtener más información.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## Código de ejemplo
<a name="samples-oracle-code"></a>

En los siguientes pasos se explica cómo crear el código DAG que probará el complemento personalizado.

1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `oracle.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## Creación del complemento personalizado
<a name="samples-oracle-create-pluginszip-steps"></a>

En esta sección se describe cómo descargar las dependencias, crear el complemento personalizado y el archivo plugins.zip.

### Descarga de dependencias
<a name="samples-oracle-install"></a>

Amazon MWAA extraerá el contenido del archivo plugins.zip en `/usr/local/airflow/plugins` en cada contenedor de procesos de trabajo y programador de Amazon MWAA. Se utiliza para añadir binarios a su entorno. En los siguientes pasos se describe cómo ensamblar los archivos necesarios para el complemento personalizado.

**Extracción de la imagen del contenedor Linux de Amazon**

1. En el símbolo del sistema, extraiga la imagen del contenedor Linux de Amazon y ejecútelo localmente. Por ejemplo:

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   El símbolo del sistema puede invocar un símbolo del sistema bash. Por ejemplo:

   ```
   bash-4.2#
   ```

1. Instale la característica de E/S asíncrona nativa de Linux (libaio).

   ```
   yum -y install libaio
   ```

1. Mantenga esta ventana abierta para los pasos siguientes. Copiaremos los siguientes archivos en el sistema local: `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1`.

**Descarga de la carpeta del cliente**

1. Instale el paquete de descompresión localmente. Por ejemplo:

   ```
   sudo yum install unzip
   ```

1. Cree un directorio de `oracle_plugin`. Por ejemplo:

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. Utilice el siguiente comando curl para descargar el archivo [instantclient-basic-linux.x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip) de [Oracle Instant Client Downloads para Linux x86-64 (64 bits)](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html).

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. Descomprima el archivo `client.zip`. Por ejemplo:

   ```
   unzip *.zip
   ```

**Extracción de archivos de Docker**

1. En un nuevo símbolo del sistema, muestre y anote su ID de contenedor de Docker. Por ejemplo:

   ```
   docker container ls
   ```

   El símbolo del sistema debería mostrar todos los contenedores y sus ID. Por ejemplo:

   ```
   debc16fd6970
   ```

1. En su directorio `oracle_plugin`, extraiga los archivos `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1` en la carpeta `instantclient_18_5` local. Por ejemplo:

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### Complemento personalizado
<a name="samples-oracle-plugins-code"></a>

Apache Airflow ejecutará el contenido de los archivos de Python en la carpeta de complementos durante el arranque. Esto se usa para establecer y modificar variables de entorno. En los siguientes pasos se describe el código de muestra del complemento personalizado.
+ Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `env_var_plugin_oracle.py`.

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

En los siguientes pasos, se muestra cómo crear el `plugins.zip`. El contenido de este ejemplo se puede combinar con sus otros complementos y binarios en un solo archivo `plugins.zip`.

**Compresión del contenido del directorio de complementos**

1. En una línea de comando, vaya al directorio `oracle_plugin`. Por ejemplo:

   ```
   cd oracle_plugin
   ```

1. Comprima el directorio `instantclient_18_5` en plugins.zip. Por ejemplo:

   ```
   zip -r ../plugins.zip ./
   ```

   La línea de comandos muestra lo siguiente:

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. Elimine el archivo `client.zip`. Por ejemplo:

   ```
   rm client.zip
   ```

**Comprima el archivo env\$1var\$1plugin\$1oracle.py**

1. Agregue el archivo `env_var_plugin_oracle.py` a la raíz de plugins.zip. Por ejemplo:

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. Ahora su archivo plugins.zip debería incluir lo siguiente:

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Opciones de configuración de Airflow
<a name="samples-oracle-airflow-config"></a>

Si utiliza Apache Airflow v2, agregue `core.lazy_load_plugins : False` como opción de configuración de Apache Airflow. Para obtener más información, consulte [Uso de las opciones de configuración para cargar complementos en la versión 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Siguientes pasos
<a name="samples-oracle-next-up"></a>
+ Aprenda a cargar el archivo `requirements.txt` de este ejemplo a su bucket de Amazon S3 en [Instalación de dependencias de Python](working-dags-dependencies.md).
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).
+ Obtenga más información sobre cómo cargar el archivo `plugins.zip` de este ejemplo a su bucket de Amazon S3 en [Instalación de complementos personalizados](configuring-dag-import-plugins.md).

# Cómo cambiar la zona horaria de un DAG en Amazon MWAA
<a name="samples-plugins-timezone"></a>

Apache Airflow programa su gráfico acíclico dirigido (DAG) en UTC\$10 de forma predeterminada. En los siguientes pasos, se muestra cómo puede cambiar la zona horaria en la que Amazon MWAA ejecuta sus DAG con [Pendulum](https://pypi.org/project/pendulum/). Opcionalmente, en este tema se muestra cómo puede crear un complemento personalizado para cambiar la zona horaria de los registros de Apache Airflow de su entorno.

**Topics**
+ [Versión](#samples-plugins-timezone-version)
+ [Requisitos previos](#samples-plugins-timezone-prerequisites)
+ [Permisos](#samples-plugins-timezone-permissions)
+ [Creación de un complemento para cambiar la zona horaria en los registros de Airflow](#samples-plugins-timezone-custom-plugin)
+ [Creación de un `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [Código de ejemplo](#samples-plugins-timezone-dag)
+ [Siguientes pasos](#samples-plugins-timezone-plugins-next-up)

## Versión
<a name="samples-plugins-timezone-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-plugins-timezone-prerequisites"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).

## Permisos
<a name="samples-plugins-timezone-permissions"></a>

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

## Creación de un complemento para cambiar la zona horaria en los registros de Airflow
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow ejecutará los archivos de Python en el directorio de `plugins` al inicio. Con el siguiente complemento, puede anular la zona horaria del ejecutor, lo que modifica la zona horaria en la que Apache Airflow escribe los registros.

1. Cree un directorio llamado `plugins` para su complemento personalizado y vaya al directorio. Por ejemplo:

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. Copie el contenido de la siguiente muestra de código y guárdelo localmente como `dag-timezone-plugin.py` en la carpeta `plugins`.

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. En el directorio `plugins`, cree un archivo de Python vacío llamado `__init__.py`. Su directorio `plugins` debería ser similar a lo siguiente:

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## Creación de un `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

En los siguientes pasos, se explica cómo crear `plugins.zip`. El contenido de esta muestra se puede combinar con otros complementos y binarios en un solo archivo `plugins.zip`.

1. En el símbolo del sistema, vaya hasta el directorio `plugins` del paso anterior. Por ejemplo:

   ```
   cd plugins
   ```

1. Comprima el contenido en su directorio `plugins`.

   ```
   zip -r ../plugins.zip ./
   ```

1. Carga de `plugins.zip` en el bucket S3

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## Código de ejemplo
<a name="samples-plugins-timezone-dag"></a>

Para cambiar la zona horaria predeterminada (UTC\$10) en la que se ejecuta el DAG, utilizaremos una biblioteca llamada [Pendulum](https://pypi.org/project/pendulum/), una biblioteca de Python para trabajar con una datetime y conocer la zona horaria.

1. En el símbolo del sistema, vaya hasta el directorio en el que están almacenados los DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido de la siguiente muestra y guárdelo como `tz-aware-dag.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  Ejecute el siguiente comando de la AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si se ejecuta correctamente, obtendrá un resultado similar al siguiente en los registros para `tz_aware_task` en el `tz_test` DAG:

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## Siguientes pasos
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ Obtenga más información sobre cómo cargar el archivo `plugins.zip` de este ejemplo a su bucket de Amazon S3 en [Instalación de complementos personalizados](configuring-dag-import-plugins.md).

# Actualización de un token de CodeArtifact
<a name="samples-code-artifact"></a>

Si utiliza CodeArtifact para instalar dependencias de Python, Amazon MWAA requiere un token activo. Para permitir que Amazon MWAA acceda a un repositorio de CodeArtifact en tiempo de ejecución, puede usar un [script de inicio](using-startup-script.md) y configurar [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url) con el token.

En el siguiente tema se describe cómo crear un script de inicio que utilice la operación de la API de CodeArtifact [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token) para recuperar un token nuevo cada vez que el entorno se inicie o se actualice.

**Topics**
+ [Versión](#samples-code-artifact-version)
+ [Requisitos previos](#samples-code-artifact-prereqs)
+ [Permisos](#samples-code-artifact-permissions)
+ [Código de ejemplo](#samples-code-artifact-code)
+ [Siguientes pasos](#samples-code-artifact-next-up)

## Versión
<a name="samples-code-artifact-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-code-artifact-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).
+ Un [repositorio de CodeArtifact](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html) en el que almacenar las dependencias de su entorno.

## Permisos
<a name="samples-code-artifact-permissions"></a>

Para actualizar el token CodeArtifact y escribir el resultado en Amazon S3, Amazon MWAA debe tener los siguientes permisos en el rol de ejecución.
+ La acción `codeartifact:GetAuthorizationToken` permite a Amazon MWAA recuperar un nuevo token de CodeArtifact. La siguiente política otorga permisos a todos los dominios de CodeArtifact que cree. Puede restringir aún más el acceso a sus dominios modificando el valor del recurso en la instrucción y especificando solo los dominios a los que desee que acceda su entorno.

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ La acción `sts:GetServiceBearerToken` es necesaria para llamar a la operación de la API [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html) de CodeArtifact. Esta operación devuelve el token que debe usarse cuando se usa un administrador de paquetes como `pip` con CodeArtifact. Para usar un administrador de paquetes con un repositorio de CodeArtifact, el rol de ejecución de su entorno debe permitir `sts:GetServiceBearerToken`, tal y como se muestra en la siguiente instrucción de política.

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## Código de ejemplo
<a name="samples-code-artifact-code"></a>

Los siguientes pasos describen cómo crear un script de inicio que actualice el token CodeArtifact.

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `code_artifact_startup_script.sh`.

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. Vaya a la carpeta en la que guardó el script. Utilice `cp` en una nueva ventana de símbolo del sistema para cargar el script en su bucket. Sustituya *amzn-s3-demo-bucket* por su información.

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Si se ejecuta correctamente, Amazon S3 muestra la ruta URL del objeto:

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   Tras cargar el script, su entorno actualiza y ejecuta el script al iniciarse.

## Siguientes pasos
<a name="samples-code-artifact-next-up"></a>
+ Aprenda a usar los scripts de inicio para personalizar su entorno en [Uso de un script de inicio con Amazon MWAA](using-startup-script.md).
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).
+ Obtenga más información sobre cómo cargar el archivo `plugins.zip` de este ejemplo a su bucket de Amazon S3 en [Instalación de complementos personalizados](configuring-dag-import-plugins.md).

# Creación de un complemento personalizado con Apache Hive y Hadoop
<a name="samples-hive"></a>

Amazon MWAA extrae el contenido de un `plugins.zip` a `/usr/local/airflow/plugins`. Esto se puede utilizar para añadir archivos binarios a sus contenedores. Además, Apache Airflow ejecuta el contenido de los archivos de Python de la carpeta `plugins` en el *inicio*, lo que permite establecer y modificar las variables de entorno. En el siguiente ejemplo se explican los pasos necesarios para crear un complemento personalizado con Apache Hive y Hadoop en un entorno de Amazon Managed Workflows para Apache Airflow. Además, se puede combinar con otros complementos y binarios personalizados.

**Topics**
+ [Versión](#samples-hive-version)
+ [Requisitos previos](#samples-hive-prereqs)
+ [Permisos](#samples-hive-permissions)
+ [Requisitos](#samples-hive-dependencies)
+ [Descarga de dependencias](#samples-hive-install)
+ [Complemento personalizado](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [Código de ejemplo](#samples-hive-code)
+ [Opciones de configuración de Airflow](#samples-hive-airflow-config)
+ [Siguientes pasos](#samples-hive-next-up)

## Versión
<a name="samples-hive-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-hive-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).

## Permisos
<a name="samples-hive-permissions"></a>

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

## Requisitos
<a name="samples-hive-dependencies"></a>

Para usar el código de ejemplo de esta página, agregue las siguientes dependencias a su `requirements.txt`. Consulte [Instalación de dependencias de Python](working-dags-dependencies.md) para obtener más información.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## Descarga de dependencias
<a name="samples-hive-install"></a>

Amazon MWAA extraerá el contenido del archivo plugins.zip en `/usr/local/airflow/plugins` en cada contenedor de procesos de trabajo y programador de Amazon MWAA. Se utiliza para añadir binarios a su entorno. En los siguientes pasos se describe cómo ensamblar los archivos necesarios para el complemento personalizado.

1. En el símbolo del sistema, vaya hasta el directorio en el que desee crear el complemento. Por ejemplo:

   ```
   cd plugins
   ```

1. Descargue [Hadoop](https://hadoop.apache.org/) desde una [réplica](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz), por ejemplo:

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. Descargue [Hive](https://hive.apache.org/) desde una [replica](https://www.apache.org/dyn/closer.cgi/hive/), por ejemplo:

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. Cree un directorio. Por ejemplo:

   ```
   mkdir hive_plugin
   ```

1. Extraiga Hadoop.

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Extraiga Hive.

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## Complemento personalizado
<a name="samples-hive-plugins-code"></a>

Apache Airflow ejecutará el contenido de los archivos de Python en la carpeta de complementos durante el arranque. Esto se usa para establecer y modificar variables de entorno. En los siguientes pasos se describe el código de muestra del complemento personalizado.

1. En una línea de comando, vaya al directorio `hive_plugin`. Por ejemplo:

   ```
   cd hive_plugin
   ```

1. Copie el contenido del siguiente ejemplo de código y guárdelo localmente como `hive_plugin.py` en el directorio `hive_plugin`.

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. Copie el contenido del siguiente texto y guárdelo localmente como `.airflowignore` en el directorio `hive_plugin`.

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

En los siguientes pasos, se explica cómo crear `plugins.zip`. El contenido de este ejemplo se puede combinar con otros complementos y archivos binarios en un solo archivo `plugins.zip`.

1. En el símbolo del sistema, vaya hasta el directorio `hive_plugin` del paso anterior. Por ejemplo:

   ```
   cd hive_plugin
   ```

1. Comprima el contenido de la carpeta `plugins`.

   ```
   zip -r ../hive_plugin.zip ./
   ```

## Código de ejemplo
<a name="samples-hive-code"></a>

En los siguientes pasos se explica cómo crear el código DAG que probará el complemento personalizado.

1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `hive.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Opciones de configuración de Airflow
<a name="samples-hive-airflow-config"></a>

Si utiliza Apache Airflow v2, agregue `core.lazy_load_plugins : False` como opción de configuración de Apache Airflow. Para obtener más información, consulte [Uso de las opciones de configuración para cargar complementos en la versión 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Siguientes pasos
<a name="samples-hive-next-up"></a>
+ Aprenda a cargar el archivo `requirements.txt` de este ejemplo a su bucket de Amazon S3 en [Instalación de dependencias de Python](working-dags-dependencies.md).
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).
+ Obtenga más información sobre cómo cargar el archivo `plugins.zip` de este ejemplo a su bucket de Amazon S3 en [Instalación de complementos personalizados](configuring-dag-import-plugins.md).

# Creación de un complemento personalizado para Apache Airflow PythonVirtualEnvOperator
<a name="samples-virtualenv"></a>

En el siguiente ejemplo, se muestra cómo parchear `PythonVirtualenvOperator` de Apache Airflow con un complemento personalizado en Amazon Managed Workflows para Apache Airflow.

**Topics**
+ [Versión](#samples-virtualenv-version)
+ [Requisitos previos](#samples-virtualenv-prereqs)
+ [Permisos](#samples-virtualenv-permissions)
+ [Requisitos](#samples-virtualenv-dependencies)
+ [Código de muestra de complemento personalizado](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [Código de ejemplo](#samples-virtualenv-code)
+ [Opciones de configuración de Airflow](#samples-virtualenv-airflow-config)
+ [Siguientes pasos](#samples-virtualenv-next-up)

## Versión
<a name="samples-virtualenv-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-virtualenv-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).

## Permisos
<a name="samples-virtualenv-permissions"></a>

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

## Requisitos
<a name="samples-virtualenv-dependencies"></a>

Para usar el código de ejemplo de esta página, agregue las siguientes dependencias a su `requirements.txt`. Consulte [Instalación de dependencias de Python](working-dags-dependencies.md) para obtener más información.

```
virtualenv
```

## Código de muestra de complemento personalizado
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow ejecutará el contenido de los archivos de Python en la carpeta de complementos durante el arranque. Este complemento parcheará la versión integrada `PythonVirtualenvOperator` durante el proceso de arranque para que sea compatible con Amazon MWAA. En los siguientes pasos, se describe el código de muestra del complemento personalizado.

1. En el símbolo del sistema, vaya hasta el directorio `plugins` del paso anterior. Por ejemplo:

   ```
   cd plugins
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `virtual_python_plugin.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

En los siguientes pasos, se muestra cómo crear el `plugins.zip`.

1. En el símbolo del sistema, vaya hasta el directorio que contiene `virtual_python_plugin.py` del paso anterior. Por ejemplo:

   ```
   cd plugins
   ```

1. Comprima el contenido de la carpeta `plugins`.

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## Código de ejemplo
<a name="samples-virtualenv-code"></a>

Los siguientes pasos describen cómo crear el código de DAG para el complemento personalizado.

1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `virtualenv_test.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Opciones de configuración de Airflow
<a name="samples-virtualenv-airflow-config"></a>

Si utiliza Apache Airflow v2, agregue `core.lazy_load_plugins : False` como opción de configuración de Apache Airflow. Para obtener más información, consulte [Uso de las opciones de configuración para cargar complementos en la versión 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## Siguientes pasos
<a name="samples-virtualenv-next-up"></a>
+ Aprenda a cargar el archivo `requirements.txt` de este ejemplo a su bucket de Amazon S3 en [Instalación de dependencias de Python](working-dags-dependencies.md).
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).
+ Obtenga más información sobre cómo cargar el archivo `plugins.zip` de este ejemplo a su bucket de Amazon S3 en [Instalación de complementos personalizados](configuring-dag-import-plugins.md).

# Invocar DAGs con una función Lambda
<a name="samples-lambda"></a>

El siguiente código de ejemplo utiliza una función [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) para obtener un token CLI de Apache Airflow e invocar un gráfico acíclico dirigido (DAG) en un entorno de Amazon MWAA.

**Topics**
+ [Versión](#samples-lambda-version)
+ [Requisitos previos](#samples-lambda-prereqs)
+ [Permisos](#samples-lambda-permissions)
+ [Dependencias](#samples-lambda-dependencies)
+ [Ejemplo de código](#samples-lambda-code)

## Versión
<a name="samples-lambda-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-lambda-prereqs"></a>

Para utilizar este ejemplo de código, debe:
+ Utilizar el [modo de acceso a la red pública](configuring-networking.md#webserver-options-public-network-onconsole) para su [entorno Amazon MWAA](get-started.md).
+ Tener una [función de Lambda](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html) utilizando el último tiempo de ejecución de Python.

**nota**  
Si la función de Lambda y su entorno Amazon MWAA están en la misma VPC, puede usar este código en una red privada. Para esta configuración, el rol de ejecución de la función de Lambda necesita permiso para llamar a la operación de la API de **CreateNetworkInterface** de Amazon Elastic Compute Cloud (Amazon EC2). Puede proporcionar este permiso mediante la política [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS administrada.

## Permisos
<a name="samples-lambda-permissions"></a>

Para usar el ejemplo de código de esta página, el rol de ejecución de su entorno Amazon MWAA necesita acceso para realizar la acción `airflow:CreateCliToken`. Puedes conceder este permiso mediante la política `AmazonMWAAAirflowCliAccess` AWS-gestionada:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Para obtener más información, consulta [Política de CLI de Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dependencias
<a name="samples-lambda-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. Se usa [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)para instalar Apache Airflow.

## Ejemplo de código
<a name="samples-lambda-code"></a>

1. Abra la AWS Lambda consola en. [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/)

1. Elija su función de Lambda en la lista de **funciones**.

1. En la página de funciones, copie el código siguiente y sustituya lo siguiente por los nombres de sus recursos:
   + `YOUR_ENVIRONMENT_NAME`: el nombre del entorno de Amazon MWAA.
   + `YOUR_DAG_NAME`: el nombre del DAG que desea invocar.

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. Elija **Implementar**.

1. Elija **Probar** para invocar la función mediante la consola Lambda.

1. Para comprobar que su Lambda ha invocado correctamente su DAG, utilice la consola Amazon MWAA para navegar hasta la interfaz de usuario de Apache Airflow de su entorno y, a continuación, haga lo siguiente:

   1. En la **DAGs**página, busque su nuevo DAG de destino en la lista de DAGs.

   1. En **Última ejecución**, compruebe la marca de tiempo de la última ejecución del DAG. Esta marca de tiempo debe acercarse lo máximo posible a la última marca de tiempo para `invoke_dag` en su otro entorno.

   1. En **Tareas recientes**, compruebe que la última ejecución se haya realizado correctamente.

# Invocación DAGs en diferentes entornos de Amazon MWAA
<a name="samples-invoke-dag"></a>

El siguiente código de ejemplo crea un token CLI de Apache Airflow. A continuación, el código utiliza un gráfico acíclico dirigido (DAG) en un entorno de Amazon MWAA para invocar un DAG en un entorno distinto de Amazon MWAA.

**Topics**
+ [Versión](#samples-invoke-dag-version)
+ [Requisitos previos](#samples-invoke-dag-prereqs)
+ [Permisos](#samples-invoke-dag-permissions)
+ [Dependencias](#samples-invoke-dag-dependencies)
+ [Ejemplo de código](#samples-invoke-dag-code)

## Versión
<a name="samples-invoke-dag-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-invoke-dag-prereqs"></a>

Para usar el ejemplo de código en esta página, necesitará lo siguiente:
+ Dos [entornos de Amazon MWAA](get-started.md) con acceso a un servidor web de **red pública**, incluido su entorno actual.
+ Un DAG de muestra cargado en el bucket de Amazon Simple Storage Service (Amazon S3) de su entorno de destino.

## Permisos
<a name="samples-invoke-dag-permissions"></a>

Para usar el código de ejemplo de esta página, el rol de ejecución de su entorno debe tener permiso para crear un token CLI de Apache Airflow. Puedes adjuntar la política AWS gestionada `AmazonMWAAAirflowCliAccess` para conceder este permiso.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Para obtener más información, consulta [Política de CLI de Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dependencias
<a name="samples-invoke-dag-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)Utilícelo para instalar Apache Airflow.

## Ejemplo de código
<a name="samples-invoke-dag-code"></a>

En el siguiente código de ejemplo se supone que está utilizando un DAG en su entorno actual para invocar un DAG en otro entorno.

1. En su terminal, vaya hasta el directorio en el que está almacenado el código de DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como `invoke_dag.py`. Reemplace los valores siguientes por sus propios valores.
   + `your-new-environment-name`: nombre del otro entorno en el que desea invocar el DAG.
   + `your-target-dag-id`: ID del DAG del otro entorno que desea invocar.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Ejecute el siguiente AWS CLI comando para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si el DAG se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas de `invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Para comprobar que el DAG se haya invocado correctamente, vaya a la interfaz de usuario de Apache Airflow del nuevo entorno y, a continuación, haga lo siguiente:

   1. En la **DAGs**página, busque su nuevo DAG de destino en la lista de DAGs.

   1. En **Última ejecución**, compruebe la marca de tiempo de la última ejecución del DAG. Esta marca de tiempo debe acercarse lo máximo posible a la última marca de tiempo para `invoke_dag` en su otro entorno.

   1. En **Tareas recientes**, compruebe que la última ejecución se haya realizado correctamente.

# Uso de Amazon MWAA con Amazon RDS para Microsoft SQL Server
<a name="samples-sql-server"></a>

Puede utilizar Amazon Managed Workflows para Apache Airflow para conectarse a un [RDS para SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html). El siguiente código de ejemplo utiliza los DAG de un entorno Amazon Managed Workflows para Apache Airflow para conectarse a un servidor Amazon RDS para Microsoft SQL Server y ejecutar consultas en él.

**Topics**
+ [Versión](#samples-sql-server-version)
+ [Requisitos previos](#samples-sql-server-prereqs)
+ [Dependencias](#samples-sql-server-dependencies)
+ [Conexión Apache Airflow v2](#samples-sql-server-conn)
+ [Código de ejemplo](#samples-sql-server-code)
+ [Siguientes pasos](#samples-sql-server-next-up)

## Versión
<a name="samples-sql-server-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-sql-server-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).
+ Amazon MWAA y el RDS para SQL Server deben ejecutarse en la misma Amazon VPC.
+ Los grupos de seguridad de VPC de Amazon MWAA y el servidor deben configurarse con las siguientes conexiones:
  + Una regla de entrada para el puerto `1433` abierto para Amazon RDS en el grupo de seguridad de Amazon MWAA
  + O una regla de salida para el puerto `1433` abierto de Amazon MWAA a RDS
+ Apache Airflow Connection para RDS para SQL Server refleja el nombre de host, el puerto, el nombre de usuario y la contraseña de la base de datos del servidor SQL de Amazon RDS creada en el proceso anterior.

## Dependencias
<a name="samples-sql-server-dependencies"></a>

Para usar el código de ejemplo de esta sección, agregue la siguiente dependencia a su `requirements.txt`. Consulte [Instalación de dependencias de Python](working-dags-dependencies.md) para obtener más información.

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Conexión Apache Airflow v2
<a name="samples-sql-server-conn"></a>

Si utiliza una conexión en Apache Airflow v2, asegúrese de que el objeto de conexión Airflow incluya los siguientes pares clave-valor:

1. **ID de conexión: ** mssql\$1default

1. **Tipo de conexión: ** Amazon Web Services

1. **Host: ** `YOUR_DB_HOST`

1. **Esquema: **

1. **Inicio de sesión: **admin

1. **Contraseña: **

1. **Puerto: **1433

1. **Extra: **

## Código de ejemplo
<a name="samples-sql-server-code"></a>

1. En el símbolo del sistema, vaya hasta el directorio en el que esté almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del código de ejemplo siguiente y guárdelo localmente como `sql-server.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## Siguientes pasos
<a name="samples-sql-server-next-up"></a>
+ Aprenda a cargar el archivo `requirements.txt` de este ejemplo a su bucket de Amazon S3 en [Instalación de dependencias de Python](working-dags-dependencies.md).
+ Aprenda a cargar el código el DAG de este ejemplo en la carpeta `dags` de su bucket de Amazon S3 en [Cómo añadir o actualizar DAG](configuring-dag-folder.md).
+ Explore ejemplos de scripts y otros [ejemplos de módulos pymssql](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html).
+ Obtenga más información sobre la ejecución de código SQL en una base de datos Microsoft SQL específica mediante [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) en la *guía de referencia de Apache Airflow*.

# Uso de imágenes de Amazon ECR con Amazon EKS
<a name="mwaa-eks-example"></a>

En el siguiente ejemplo se muestra cómo usar Amazon Managed Workflows para Apache Airflow con Amazon EKS.

**Topics**
+ [Versión](#mwaa-eks-example-version)
+ [Requisitos previos](#eksctl-prereqs)
+ [Creación de una clave pública para Amazon EC2](#eksctl-create-key)
+ [Cree el clúster](#create-cluster-eksctl)
+ [Creación de un espacio de nombres de `mwaa`](#eksctl-namespace)
+ [Creación de un rol para el espacio de nombres de `mwaa`](#eksctl-role)
+ [Creación del rol de IAM del clúster de Amazon EKS](#eksctl-iam-role)
+ [Creación del archivo requirements.txt](#eksctl-requirements)
+ [Creación de un mapeo de identidad para Amazon EKS](#eksctl-identity-map)
+ [Creación del `kubeconfig`](#eksctl-kube-config)
+ [Creación de un DAG](#eksctl-create-dag)
+ [Adición del DAG y `kube_config.yaml` al bucket de Amazon S3](#eksctl-dag-bucket)
+ [Habilitación y activación del ejemplo](#eksctl-trigger-pod)

## Versión
<a name="mwaa-eks-example-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="eksctl-prereqs"></a>

Para usar el ejemplo de este tema, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).
+ eksctl. Para obtener más información, consulte cómo [instalar eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl).
+ kubectl. Para obtener más información, consulte cómo [instalar y configurar kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/). En algunos casos, se instala con eksctl.
+ Un par de claves de EC2 en la región donde se crea su entorno de Amazon MWAA. Para obtener más información, consulte cómo [crear o importar un par de claves](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair).

**nota**  
Cuando utilice un comando `eksctl`, puede incluir un `--profile` para especificar un perfil distinto del predeterminado.

## Creación de una clave pública para Amazon EC2
<a name="eksctl-create-key"></a>

Utilice el siguiente comando para crear una clave pública a partir de su clave privada.

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

Para obtener más información, consulte cómo [recuperar la clave pública de su par de claves](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key).

## Cree el clúster
<a name="create-cluster-eksctl"></a>

Utilice el siguiente comando para crear el clúster. Si desea usar un nombre personalizado para el clúster o crearlo en una región diferente, sustituya los valores del nombre y la región. Debe crear el clúster en la misma región en la que haya creado el entorno de Amazon MWAA. Sustituya los valores de las subredes para que coincidan con las subredes de la red de Amazon VPC que utilice para Amazon MWAA. Sustituya el valor por `ssh-public-key` para que coincida con la clave que utilice. Puede utilizar una clave existente de Amazon EC2 que se encuentre en la misma región o crear una nueva en la misma región donde creó su entorno de Amazon MWAA.

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

La creación del clúster tarde un tiempo en completarse. Una vez que termine el proceso, puede comprobar que el clúster se haya creado correctamente y que tiene el proveedor OIDC de IAM configurado mediante el siguiente comando:

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## Creación de un espacio de nombres de `mwaa`
<a name="eksctl-namespace"></a>

Tras confirmar que el clúster se ha creado correctamente, utilice el siguiente comando para crear un espacio de nombres para los pods.

```
kubectl create namespace mwaa
```

## Creación de un rol para el espacio de nombres de `mwaa`
<a name="eksctl-role"></a>

Tras crear el espacio de nombres, cree un rol y un enlace de rol para un usuario de Amazon MWAA en EKS que pueda ejecutar pods en un espacio de nombres de MWAA. Si utilizó un nombre diferente para el espacio de nombres, reemplace mwaa en `-n mwaa` por el nombre que usó.

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

Confirme que el nuevo rol puede acceder al clúster de Amazon EKS ejecutando el siguiente comando. Asegúrese de usar el nombre correcto si no usó*mwaa*:

```
kubectl get pods -n mwaa --as mwaa-service
```

Deberá aparecer un mensaje que indique lo siguiente:

```
No resources found in mwaa namespace.
```

## Creación del rol de IAM del clúster de Amazon EKS
<a name="eksctl-iam-role"></a>

Debe crear un rol de IAM y, a continuación, vincularlo al clúster de Amazon EKS (k8s) para que se pueda usar para la autenticación a través de IAM. El rol solo se usa para iniciar sesión en el clúster y no tiene ningún permiso en lo que respecta a la consola o las llamadas a la API.

Cree un nuevo rol para el entorno de Amazon MWAA siguiendo los pasos que se indican en [Rol de ejecución de Amazon MWAA](mwaa-create-role.md). Sin embargo, en lugar de crear y adjuntar las políticas descritas en ese tema, adjunte la siguiente política:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

Una vez que haya creado el rol, edite el entorno de Amazon MWAA para usar el rol que haya creado como rol de ejecución para el entorno. Para cambiar el rol, edite el entorno que se vaya a utilizar. Seleccione el rol de ejecución en **Permisos**.

**Problemas conocidos:**
+ Existe un problema conocido relacionado con la función de que las subrutas no se pueden autenticar ARNs con Amazon EKS. La solución consiste en crear el rol de servicio de forma manual, en lugar de utilizar el que creó Amazon MWAA. Para obtener más información, consulte los [roles con rutas que no funcionan cuando la ruta está incluida en su ARN en el mapa de configuración de aws-auth](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ Si la lista de servicios de Amazon MWAA no está disponible en IAM, debe elegir una política de servicio alternativa, como Amazon EC2, y, a continuación, actualizar la política de confianza del rol para que coincida con lo siguiente:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  Para obtener más información, consulte [cómo usar las políticas de confianza con roles de IAM](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/).

## Creación del archivo requirements.txt
<a name="eksctl-requirements"></a>

Para usar el código de ejemplo de esta sección, compruebe que ha añadido una de las siguientes opciones de base de datos a sus `requirements.txt`. Consulte [Instalación de dependencias de Python](working-dags-dependencies.md) para obtener más información.

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Creación de un mapeo de identidad para Amazon EKS
<a name="eksctl-identity-map"></a>

Utilice el ARN del rol que creó en el siguiente comando para crear un mapeo de identidad para Amazon EKS. Cambie la región por *us-east-1* la región en la que creó el entorno. Sustituya el ARN de la función y, por último, *mwaa-execution-role* sustitúyala por la función de ejecución de su entorno.

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## Creación del `kubeconfig`
<a name="eksctl-kube-config"></a>

Utilice el siguiente comando para crear el rol `kubeconfig`:

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

Si utilizó un perfil específico al ejecutar `update-kubeconfig`, tendrá que eliminar la sección `env:` añadida al archivo kube\$1config.yaml para que funcione correctamente con Amazon MWAA. Para ello, elimine lo siguiente del archivo y guárdelo:

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## Creación de un DAG
<a name="eksctl-create-dag"></a>

Utilice el siguiente ejemplo de código para crear un archivo de Python, por ejemplo, `mwaa_pod_example.py` para el DAG.

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## Adición del DAG y `kube_config.yaml` al bucket de Amazon S3
<a name="eksctl-dag-bucket"></a>

Coloque el DAG que creó y el archivo `kube_config.yaml` en el bucket de Amazon S3 para el entorno de Amazon MWAA. Puede colocar los archivos en el bucket a través de la consola de Amazon S3 o el AWS Command Line Interface.

## Habilitación y activación del ejemplo
<a name="eksctl-trigger-pod"></a>

En Apache Airflow, habilite el ejemplo y actívelo.

Cuando se ejecute y se complete correctamente, utilice el siguiente comando para verificar el pod:

```
kubectl get pods -n mwaa
```

Obtendrá un resultado similar al siguiente:

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

A continuación, puede verificar la salida del pod con el siguiente comando. Reemplace el nombre del valor por el valor devuelto por el comando anterior:

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# Conexión a Amazon ECS mediante el `ECSOperator`
<a name="samples-ecs-operator"></a>

En esta página, se describe cómo puede usar el `ECSOperator` para conectarse a un contenedor de Amazon Elastic Container Service (Amazon ECS) desde Amazon MWAA. En los siguientes pasos, añadirá los permisos necesarios a la función de ejecución de su entorno, utilizará una CloudFormation plantilla para crear un clúster Fargate de Amazon ECS y, por último, creará y cargará un DAG que se conecte a su nuevo clúster.

**Topics**
+ [Versión](#samples-ecs-operator-version)
+ [Requisitos previos](#samples-ecs-operator-prereqs)
+ [Permisos](#samples-ecs-operator-permissions)
+ [Crear un clúster de Amazon ECS](#create-cfn-template)
+ [Código de ejemplo](#samples-ecs-operator-code)

## Versión
<a name="samples-ecs-operator-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-ecs-operator-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).

## Permisos
<a name="samples-ecs-operator-permissions"></a>
+ El rol de ejecución de su entorno necesita permiso para ejecutar tareas en Amazon ECS. Puede adjuntar la política FullAccess AWS administrada por [AmazonECS\$1](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) a su función de ejecución o crear y adjuntar la siguiente política a su función de ejecución.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ Además de añadir los permisos necesarios para ejecutar tareas en Amazon ECS, también debe modificar la declaración de política de Logs en su función de ejecución de Amazon MWAA para permitir el acceso al grupo de CloudWatch registros de tareas de Amazon ECS, tal y como se indica a continuación. El grupo de registros de Amazon ECS lo crea la CloudFormation plantilla en[Crear un clúster de Amazon ECS](#create-cfn-template).

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

Para obtener más información sobre el rol de ejecución de Amazon MWAA y sobre cómo adjuntar una política, consulte [Rol de ejecución](mwaa-create-role.md).

## Crear un clúster de Amazon ECS
<a name="create-cfn-template"></a>

Con la siguiente CloudFormation plantilla, creará un clúster Fargate de Amazon ECS para usarlo con su flujo de trabajo de Amazon MWAA. Para obtener más información, consulte cómo [crear una definición de tarea](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition) en la *guía para desarrolladores de Amazon Elastic Container Service*.

1. Cree un archivo JSON con el siguiente contenido y guárdelo como `ecs-mwaa-cfn.json`.

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. En la línea de comandos, utilice el siguiente AWS CLI comando para crear una pila nueva. Debe reemplazar los valores `SecurityGroups` y `SubnetIds` por los valores de los grupos de seguridad y las subredes de su entorno de Amazon MWAA.

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   También puede utilizar el siguiente script de shell: El script recupera los valores necesarios para los grupos de seguridad y las subredes de su entorno mediante el `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI comando y, a continuación, crea la pila en consecuencia. Para ejecutar el script, siga los pasos que se detallan a continuación.

   1. Copie y guarde el script `ecs-stack-helper.sh` en el mismo directorio que la plantilla. CloudFormation 

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. Ejecute el script mediante los comandos siguientes. Reemplace el `environment-name` y el `stack-name` con su información.

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   Si se ejecuta correctamente, consultarás el siguiente resultado que muestra tu nueva ID de CloudFormation pila.

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

Una vez que la CloudFormation pila esté completa y se AWS hayan aprovisionado los recursos de Amazon ECS, estará listo para crear y cargar su DAG.

## Código de ejemplo
<a name="samples-ecs-operator-code"></a>

1. Abra un símbolo del sistema y vaya hasta el directorio en el que está almacenado el código DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como `mwaa-ecs-operator.py`; a continuación, cargue su nuevo DAG a Amazon S3.

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**nota**  
En el ejemplo del DAG, para `awslogs_group`, puede que tenga que modificar el grupo de registro con el nombre de su grupo de registro de tareas de Amazon ECS. El ejemplo asume un grupo de registro denominado “`mwaa-ecs-zero`”. Para `awslogs_stream_prefix`, utilice el prefijo del flujo de registro de tareas de Amazon ECS. El ejemplo asume un prefijo de flujo de registro, `ecs`.

1.  Ejecute el siguiente AWS CLI comando para copiar el DAG al bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas del `ecs_operator_task` en el DAG `ecs_fargate_dag`:

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Uso de dbt con Amazon MWAA
<a name="samples-dbt"></a>

En este tema se muestra cómo puede utilizar dbt y Postgres con Amazon MWAA. En los siguientes pasos, añadirá las dependencias necesarias a su `requirements.txt` y cargará un ejemplo de proyecto de dbt en el bucket de Amazon S3 de su entorno. A continuación, utilizará un ejemplo de DAG para comprobar que Amazon MWAA ha instalado las dependencias y, por último, utilizará el `BashOperator` para ejecutar el proyecto dbt.

**Topics**
+ [Versión](#samples-dbt-version)
+ [Requisitos previos](#samples-dbt-prereqs)
+ [Dependencias](#samples-dbt-dependencies)
+ [Carga de un proyecto de dbt en Amazon S3](#samples-dbt-upload-project)
+ [Uso de un DAG para verificar la instalación de la dependencia de dbt](#samples-dbt-test-dependencies)
+ [Uso de un DAG para ejecutar un proyecto dbt](#samples-dbt-run-project)

## Versión
<a name="samples-dbt-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-dbt-prereqs"></a>

Para completar los siguientes pasos, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md) que utilice Apache Airflow v2.2.2. Este ejemplo se escribió y probó con la versión 2.2.2. Es posible que tenga que modificar el ejemplo para usarlo con otras versiones de Apache Airflow.
+ Un ejemplo de proyecto de dbt. Para empezar a usar dbt con Amazon MWAA, puede crear una bifurcación y clonar el [proyecto inicial de dbt](https://github.com/dbt-labs/dbt-starter-project) desde el repositorio dbt-labs. GitHub 

## Dependencias
<a name="samples-dbt-dependencies"></a>

Para usar Amazon MWAA con dbt, agregue el siguiente script de inicio a su entorno. Para obtener más información, consulte [Cómo usar un script de inicio con Amazon MWAA](using-startup-script.md).

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

En las secciones siguientes, cargará el directorio de su proyecto dbt a Amazon S3 y ejecutará un DAG que valide si Amazon MWAA ha instalado las dependencias dbt requeridas correctamente.

## Carga de un proyecto de dbt en Amazon S3
<a name="samples-dbt-upload-project"></a>

Para poder utilizar un proyecto dbt con su entorno Amazon MWAA, cargue todo el directorio del proyecto a la carpeta de `dags` de su entorno. Cuando el entorno se actualice, Amazon MWAA descargará el directorio dbt en la carpeta local de `usr/local/airflow/dags/`.

**Pasos para cargar un proyecto de dbt en Amazon S3**

1. Vaya al directorio en el que clonó el proyecto de inicio de dbt.

1. Ejecute el siguiente AWS CLI comando de Amazon S3 para copiar de forma recursiva el contenido del proyecto en la `dags` carpeta de su entorno mediante el `--recursive` parámetro. El comando creará un subdirectorio llamado “`dbt`” que puede usar para todos sus proyectos de dbt. Si el subdirectorio ya existe, los archivos del proyecto se copiarán en el directorio existente, no se creará un nuevo directorio. El comando también creará un subdirectorio dentro del directorio de `dbt` para este proyecto inicial específico.

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   Puede utilizar diferentes nombres para los subdirectorios de los proyectos a fin de organizar varios proyectos de dbt dentro del directorio principal de `dbt`.

## Uso de un DAG para verificar la instalación de la dependencia de dbt
<a name="samples-dbt-test-dependencies"></a>

El siguiente DAG utiliza un `BashOperator` y un comando de bash para comprobar si Amazon MWAA ha instalado correctamente las dependencias dbt especificadas en el archivo `requirements.txt`.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

Realice los siguientes pasos para ver los registros de tareas y comprobar que dbt y sus dependencias se hayan instalado.

1. Vaya a la consola de Amazon MWAA y, a continuación, seleccione **Abrir interfaz de usuario de Airflow** en la lista de entornos disponibles.

1. En la UI de Apache Airflow, busque el DAG de `dbt-installation-test` en la lista y, a continuación, elija la fecha que aparece debajo de la columna `Last Run` para abrir la última tarea completada.

1. Con **Vista de gráfico**, elija la tarea `bash_command` para abrir los detalles de la instancia de la tarea.

1. Seleccione **Registro** para abrir los registros de tareas y, a continuación, compruebe que muestran la versión de dbt especificada en `requirements.txt` correctamente.

## Uso de un DAG para ejecutar un proyecto dbt
<a name="samples-dbt-run-project"></a>

El siguiente DAG utiliza un `BashOperator` para copiar los proyectos dbt que ha cargado en Amazon S3 desde el directorio local `usr/local/airflow/dags/` al directorio accesible para escritura `/tmp` y, a continuación, ejecuta el proyecto dbt. Los comandos de bash asumen un proyecto dbt inicial titulado “`dbt-starter-project`”. Modifique el nombre del directorio de acuerdo con el nombre del directorio de su proyecto.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS blogs y tutoriales
<a name="samples-blogs-tutorials"></a>
+ [Uso de Amazon EKS y Amazon MWAA for Apache Airflow v2.x](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)