

# Code examples for Amazon Managed Workflows for Apache Airflow
<a name="sample-code"></a>

This guide contains code samples, including DAGs and custom plugins, that you can use on an Amazon Managed Workflows for Apache Airflow environment. For more examples of using Apache Airflow with AWS services, refer to the [https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags](https://github.com/aws-samples/amazon-mwaa-examples/tree/main/dags) directory in the Apache Airflow GitHub repository.

**Topics**
+ [Using a DAG to import variables in the CLI](samples-variables-import.md)
+ [Creating an SSH connection using the `SSHOperator`](samples-ssh.md)
+ [Using a secret key in AWS Secrets Manager for an Apache Airflow Snowflake connection](samples-sm-snowflake.md)
+ [Using a DAG to write custom metrics in CloudWatch](samples-custom-metrics.md)
+ [Aurora PostgreSQL database cleanup on an Amazon MWAA environment](samples-database-cleanup.md)
+ [Exporting environment metadata to CSV files on Amazon S3](samples-dag-run-info-to-csv.md)
+ [Using a secret key in AWS Secrets Manager for an Apache Airflow variable](samples-secrets-manager-var.md)
+ [Using a secret key in AWS Secrets Manager for an Apache Airflow connection](samples-secrets-manager.md)
+ [Creating a custom plugin with Oracle](samples-oracle.md)
+ [Changing a DAG's timezone on Amazon MWAA](samples-plugins-timezone.md)
+ [Refreshing a CodeArtifact token](samples-code-artifact.md)
+ [Creating a custom plugin with Apache Hive and Hadoop](samples-hive.md)
+ [Creating a custom plugin for Apache Airflow PythonVirtualenvOperator](samples-virtualenv.md)
+ [Invoking DAGs with a Lambda function](samples-lambda.md)
+ [Invoking DAGs in different Amazon MWAA environments](samples-invoke-dag.md)
+ [Using Amazon MWAA with Amazon RDS for Microsoft SQL Server](samples-sql-server.md)
+ [Using Amazon MWAA with Amazon EKS](mwaa-eks-example.md)
+ [Connecting to Amazon ECS using the `ECSOperator`](samples-ecs-operator.md)
+ [Using dbt with Amazon MWAA](samples-dbt.md)
+ [AWS blogs and tutorials](#samples-blogs-tutorials)

# Using a DAG to import variables in the CLI
<a name="samples-variables-import"></a>

The following sample code imports variables using the CLI on Amazon Managed Workflows for Apache Airflow.

**Topics**
+ [Version](#samples-variables-import-version)
+ [Prerequisites](#samples-variables-import-prereqs)
+ [Permissions](#samples-variables-import-permissions)
+ [Dependencies](#samples-variables-import-dependencies)
+ [Code sample](#samples-variables-import-code)
+ [What's next?](#samples-variables-import-next-up)

## Version
<a name="samples-variables-import-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-variables-import-prereqs"></a>

No additional permissions are required to use the code example on this page.

## Permissions
<a name="samples-variables-import-permissions"></a>

Your AWS account needs access to the `AmazonMWAAAirflowCliAccess` policy. To learn more, refer to [Apache Airflow CLI policy: AmazonMWAAAirflowCliAccess](access-policies.md).

## Dependencies
<a name="samples-variables-import-dependencies"></a>

To use this code example with Apache Airflow v2 and later, no additional dependencies are required. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) to install Apache Airflow.

## Code sample
<a name="samples-variables-import-code"></a>

The following sample code takes three inputs: your Amazon MWAA environment name (in `mwaa_env`), the AWS Region of your environment (in `aws_region`), and the local file that contains the variables you want to import (in `var_file`).

```
import boto3
import json
import requests 
import base64
import getopt
import sys

argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''

try:
    opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
    #if len(opts) == 0 and len(opts) > 3:
    if len(opts) != 3:
        print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
    else:
        for opt, arg in opts:
            if opt in ("-e"):
                mwaa_env=arg
            elif opt in ("-r"):
                aws_region=arg
            elif opt in ("-v"):
                var_file=arg

        boto3.setup_default_session(region_name="{}".format(aws_region))
        mwaa_env_name = "{}".format(mwaa_env)

        client = boto3.client('mwaa')
        mwaa_cli_token = client.create_cli_token(
            Name=mwaa_env_name
        )
        
        with open ("{}".format(var_file), "r") as myfile:
            fileconf = myfile.read().replace('\n', '')

        json_dictionary = json.loads(fileconf)
        for key in json_dictionary:
            print(key, " ", json_dictionary[key])
            val = (key + " " + json_dictionary[key])
            mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
            mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
            raw_data = "variables set {0}".format(val)
            mwaa_response = requests.post(
                mwaa_webserver_hostname,
                headers={
                    'Authorization': mwaa_auth_token,
                    'Content-Type': 'text/plain'
                    },
                data=raw_data
                )
            mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
            mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
            print(mwaa_response.status_code)
            print(mwaa_std_err_message)
            print(mwaa_std_out_message)

except:
    print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(2)
```

## What's next?
<a name="samples-variables-import-next-up"></a>
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).

# Creating an SSH connection using the `SSHOperator`
<a name="samples-ssh"></a>

The following example describes how you can use the `SSHOperator` in a directed acyclic graph (DAG) to connect to a remote Amazon EC2 instance from your Amazon Managed Workflows for Apache Airflow environment. You can use a similar approach to connect to any remote instance with SSH access.

In the following example, you upload an SSH secret key (`.pem`) to your environment's `dags` directory on Amazon S3. Then, you install the necessary dependencies using `requirements.txt` and create a new Apache Airflow connection in the UI. Finally, you write a DAG that creates an SSH connection to the remote instance.

**Topics**
+ [Version](#samples-ssh-version)
+ [Prerequisites](#samples-ssh-prereqs)
+ [Permissions](#samples-ssh-permissions)
+ [Requirements](#samples-ssh-dependencies)
+ [Copy your secret key to Amazon S3](#samples-ssh-secret)
+ [Create a new Apache Airflow connection](#samples-ssh-connection)
+ [Code sample](#samples-ssh-code)

## Version
<a name="samples-ssh-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-ssh-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).
+ An SSH secret key. The code sample assumes you have an Amazon EC2 instance and a `.pem` in the same Region as your Amazon MWAA environment. If you don't have a key, refer to [Create or import a key pair](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair) in the *Amazon EC2 User Guide*.

## Permissions
<a name="samples-ssh-permissions"></a>

No additional permissions are required to use the code example on this page.

## Requirements
<a name="samples-ssh-dependencies"></a>

Add the following parameter to `requirements.txt` to install the `apache-airflow-providers-ssh` package on the webserver. Once your environment updates and Amazon MWAA successfully installs the dependency, you will get a new **SSH** connection type in the UI.

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt
apache-airflow-providers-ssh
```

**Note**  
`-c` defines the constraints URL in `requirements.txt`. This ensures that Amazon MWAA installs the correct package version for your environemnt.

## Copy your secret key to Amazon S3
<a name="samples-ssh-secret"></a>

Use the following AWS Command Line Interface command to copy your `.pem` key to your environment's `dags` directory in Amazon S3.

```
aws s3 cp your-secret-key.pem s3://amzn-s3-demo-bucket/dags/
```

Amazon MWAA copies the content in `dags`, including the `.pem` key, to the local `/usr/local/airflow/dags/` directory, By doing this, Apache Airflow can access the key.

## Create a new Apache Airflow connection
<a name="samples-ssh-connection"></a>

**To create a new SSH connection using the Apache Airflow UI**

1. Open the [Environments](https://console.aws.amazon.com/mwaa/home#/environments) page on the Amazon MWAA console.

1. From the list of environments, choose **Open Airflow UI** for your environment.

1. On the Apache Airflow UI page, choose **Admin** from the main navigation bar to expand the dropdown list, then choose **Connections**.

1. On the **List Connections** page, choose **\$1**, or **Add a new record** button to add a new connection.

1. On the **Add Connection** page, add the following information:

   1. For **Connection Id**, enter **ssh\$1new**.

   1. For **Connection Type**, choose **SSH** from the dropdown list.
**Note**  
If the **SSH** connection type is not available in the list, Amazon MWAA hasn't installed the required `apache-airflow-providers-ssh` package. Update your `requirements.txt` file to include this package, then try again.

   1. For **Host**, enter the IP address for the Amazon EC2 instance that you want to connect to. For example, **12.345.67.89**.

   1. For **Username**, enter **ec2-user** if you are connecting to an Amazon EC2 instance. Your username might be different, depending on the type of remote instance you want Apache Airflow to connect to.

   1. For **Extra**, enter the following key-value pair in JSON format:

      ```
      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }
      ```

      This key-value pair instructs Apache Airflow to search for the secret key in the local `/dags` directory.

## Code sample
<a name="samples-ssh-code"></a>

The following DAG uses the `SSHOperator` to connect to your target Amazon EC2 instance, then runs the `hostname` Linux command to print the name of the instance. You can modify the DAG to run any command or script on the remote instance.

1. Open a terminal, and navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `ssh.py`.

   ```
   from airflow.decorators import dag
   from datetime import datetime
   from airflow.providers.ssh.operators.ssh import SSHOperator
   
   @dag(
       dag_id="ssh_operator_example",
       schedule_interval=None,     
       start_date=datetime(2022, 1, 1),
       catchup=False,
       )
   def ssh_dag():
       task_1=SSHOperator(
           task_id="ssh_task",
           ssh_conn_id='ssh_new',
           command='hostname',
       )
   
   my_ssh_dag = ssh_dag()
   ```

1.  Run the following AWS CLI command to copy the DAG to your environment's bucket, then trigger the DAG using the Apache Airflow UI. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. If successful, you'll get output similar to the following in the task logs for `ssh_task` in the `ssh_operator_example` DAG:

   ```
   [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
   Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
   [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) 
   [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
   [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
   [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
   [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916
   ```

# Using a secret key in AWS Secrets Manager for an Apache Airflow Snowflake connection
<a name="samples-sm-snowflake"></a>

The following sample calls AWS Secrets Manager to get a secret key for an Apache Airflow Snowflake connection on Amazon Managed Workflows for Apache Airflow. It assumes you've completed the steps in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

**Topics**
+ [Version](#samples-sm-snowflake-version)
+ [Prerequisites](#samples-sm-snowflake-prereqs)
+ [Permissions](#samples-sm-snowflake-permissions)
+ [Requirements](#samples-sm-snowflake-dependencies)
+ [Code sample](#samples-sm-snowflake-code)
+ [What's next?](#samples-sm-snowflake-next-up)

## Version
<a name="samples-sm-snowflake-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-sm-snowflake-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ The Secrets Manager backend as an Apache Airflow configuration option as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).
+ An Apache Airflow connection string in Secrets Manager as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

## Permissions
<a name="samples-sm-snowflake-permissions"></a>
+ Secrets Manager permissions as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

## Requirements
<a name="samples-sm-snowflake-dependencies"></a>

To use the sample code on this page, add the following dependencies to your `requirements.txt`. To learn more, refer to [Installing Python dependencies](working-dags-dependencies.md).

```
apache-airflow-providers-snowflake==1.3.0
```

## Code sample
<a name="samples-sm-snowflake-code"></a>

The following steps describe how to create the DAG code that calls Secrets Manager to get the secret.

1. In your command prompt, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `snowflake_connection.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
   from airflow.utils.dates import days_ago
   
   snowflake_query = [
       """use warehouse "MY_WAREHOUSE";""",
       """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""",
   ]
   
   with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       snowflake_select = SnowflakeOperator(
           task_id="snowflake_select",
           sql=snowflake_query,
           snowflake_conn_id="snowflake_conn",
       )
   ```

## What's next?
<a name="samples-sm-snowflake-next-up"></a>
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).

# Using a DAG to write custom metrics in CloudWatch
<a name="samples-custom-metrics"></a>

You can use the following code example to write a directed acyclic graph (DAG) that runs a `PythonOperator` to retrieve OS-level metrics for an Amazon MWAA environment. The DAG then publishes the data as custom metrics to Amazon CloudWatch.

Custom OS-level metrics provide you with additional visibility about how your environment workers are utilizing resources such as virtual memory and CPU. You can use this information to select the [environment class](environment-class.md) that best suits your workload.

**Topics**
+ [Version](#samples-custom-metrics-version)
+ [Prerequisites](#samples-custom-metrics-prereqs)
+ [Permissions](#samples-custom-metrics-permissions)
+ [Dependencies](#samples-custom-metrics-dependencies)
+ [Code example](#samples-custom-metrics-code)

## Version
<a name="samples-custom-metrics-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-custom-metrics-prereqs"></a>

To use the code example on this page, you need the following:
+ An [Amazon MWAA environment](get-started.md).

## Permissions
<a name="samples-custom-metrics-permissions"></a>

No additional permissions are required to use the code example on this page.

## Dependencies
<a name="samples-custom-metrics-dependencies"></a>
+ No additional dependencies are required to use the code example on this page.

## Code example
<a name="samples-custom-metrics-code"></a>

1. In your command prompt, navigate to the folder where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code example and save it locally as `dag-custom-metrics.py`. Replace `MWAA-ENV-NAME` with your environment name.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   from datetime import datetime
   import os,json,boto3,psutil,socket
   
   def publish_metric(client,name,value,cat,unit='None'):
       environment_name = os.getenv("MWAA_ENV_NAME")
       value_number=float(value)
       hostname = socket.gethostname()
       ip_address = socket.gethostbyname(hostname)
       print('writing value',value_number,'to metric',name)
       response = client.put_metric_data(
           Namespace='MWAA-Custom',
           MetricData=[
               {
                   'MetricName': name,
                   'Dimensions': [
                       {
                           'Name': 'Environment',
                           'Value': environment_name
                       },
                       {
                           'Name': 'Category',
                           'Value': cat
                       },       
                       {
                           'Name': 'Host',
                           'Value': ip_address
                       },                                     
                   ],
                   'Timestamp': datetime.now(),
                   'Value': value_number,
                   'Unit': unit
               },
           ]
       )
       print(response)
       return response
   
   def python_fn(**kwargs):
       client = boto3.client('cloudwatch')
   
       cpu_stats = psutil.cpu_stats()
       print('cpu_stats', cpu_stats)
   
       virtual = psutil.virtual_memory()
       cpu_times_percent = psutil.cpu_times_percent(interval=0)
   
       publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes')
       publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent')
   
       publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent')
       publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent')
   
       return "OK"
   
   
   with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag:
       t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
   ```

1.  Run the following AWS CLI command to copy the DAG to your environment's bucket, then trigger the DAG using the Apache Airflow UI. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. If the DAG runs successfully, you get something similar to the following in your Apache Airflow logs:

   ```
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
   [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
   ...
   [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
   [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
   ```

# Aurora PostgreSQL database cleanup on an Amazon MWAA environment
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow uses an Aurora PostgreSQL database as the Apache Airflow metadata database, where DAG runs and task instances are stored. The following sample code periodically clears out entries from the dedicated Aurora PostgreSQL database for your Amazon MWAA environment.

**Topics**
+ [Version](#samples-database-cleanup-version)
+ [Prerequisites](#samples-database-cleanup-prereqs)
+ [Dependencies](#samples-sql-server-dependencies)
+ [Code sample](#samples-database-cleanup-code)

## Version
<a name="samples-database-cleanup-version"></a>

The code samples on this page are specific to Apache Airflow v2 supported on Amazon MWAA. Refer to the [supported Apache Airflow versions](airflow-versions.md).

**Tip**  
**For Apache Airflow v3 users**: If you want to clean up a database (purge old records from metastore tables), run the `db clean` CLI command.

## Prerequisites
<a name="samples-database-cleanup-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).

## Dependencies
<a name="samples-sql-server-dependencies"></a>

To use this code example with Apache Airflow v2, no additional dependencies are required. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) to install Apache Airflow.

## Code sample
<a name="samples-database-cleanup-code"></a>

The following DAG cleans the metadata database for the tables specified in `TABLES_TO_CLEAN`. The example deletes data from the specified tables that is older than 30 days. To adjust how far back the entries are deleted, set `MAX_AGE_IN_DAYS` to a different value.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------

# Exporting environment metadata to CSV files on Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Use the following code example to create a directed acyclic graph (DAG) that queries the database for a range of DAG run information, and writes the data to `.csv` files stored on Amazon S3.

You might want to export information from your environment's Aurora PostgreSQL database to inspect the data locally, archive them in object storage, or combine them with tools like the [Amazon S3 to Amazon Redshift operator](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) and the [database cleanup](samples-database-cleanup.md), to move Amazon MWAA metadata out of the environment, but preserve them for future analysis.

You can query the database for any of the objects listed in [Apache Airflow models](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models). This code sample uses three models, `DagRun`, `TaskFail`, and `TaskInstance`, which provide information relevant To DAG runs.

**Topics**
+ [Version](#samples-dag-run-info-to-csv-version)
+ [Prerequisites](#samples-dag-run-info-to-csv-prereqs)
+ [Permissions](#samples-dag-run-info-to-csv-permissions)
+ [Requirements](#samples-dag-run-info-to-csv-dependencies)
+ [Code sample](#samples-dag-run-info-to-csv-code)

## Version
<a name="samples-dag-run-info-to-csv-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-dag-run-info-to-csv-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).
+ A [new Amazon S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) where you want to export your metadata information.

## Permissions
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA needs permission for the Amazon S3 action `s3:PutObject` to write the queried metadata information to Amazon S3. Add the following policy statement to your environment's execution role.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

This policy limits write access to only *amzn-s3-demo-bucket*.

## Requirements
<a name="samples-dag-run-info-to-csv-dependencies"></a>

To use this code example with Apache Airflow v2 and later, no additional dependencies are required. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) to install Apache Airflow.

## Code sample
<a name="samples-dag-run-info-to-csv-code"></a>

The following steps describe how you can create a DAG that queries the Aurora PostgreSQL and writes the result to your new Amazon S3 bucket.

1. In your terminal, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code example and save it locally as `metadata_to_csv.py`. You can change the value assigned to `MAX_AGE_IN_DAYS` to control the age of the oldest records your DAG queries from the metadata database.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Run the following AWS CLI command to copy the DAG to your environment's bucket, then trigger the DAG using the Apache Airflow UI. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. If successful, you'll output similar to the following in the task logs for the `export_db` task:

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   You can now access and download the exported `.csv` files in your new Amazon S3 bucket in `/files/export/`.

# Using a secret key in AWS Secrets Manager for an Apache Airflow variable
<a name="samples-secrets-manager-var"></a>

The following sample calls AWS Secrets Manager to get a secret key for an Apache Airflow variable on Amazon Managed Workflows for Apache Airflow. It assumes you've completed the steps in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

**Topics**
+ [Version](#samples-secrets-manager-var-version)
+ [Prerequisites](#samples-secrets-manager-var-prereqs)
+ [Permissions](#samples-secrets-manager-var-permissions)
+ [Requirements](#samples-hive-dependencies)
+ [Code sample](#samples-secrets-manager-var-code)
+ [What's next?](#samples-secrets-manager-var-next-up)

## Version
<a name="samples-secrets-manager-var-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-secrets-manager-var-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ The Secrets Manager backend as an Apache Airflow configuration option as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).
+ An Apache Airflow variable string in Secrets Manager as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

## Permissions
<a name="samples-secrets-manager-var-permissions"></a>
+ Secrets Manager permissions as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

## Requirements
<a name="samples-hive-dependencies"></a>

To use this code example with Apache Airflow v2 and later, no additional dependencies are required. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) to install Apache Airflow.

## Code sample
<a name="samples-secrets-manager-var-code"></a>

The following steps describe how to create the DAG code that calls Secrets Manager to get the secret.

1. In your command prompt, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `secrets-manager-var.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## What's next?
<a name="samples-secrets-manager-var-next-up"></a>
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).

# Using a secret key in AWS Secrets Manager for an Apache Airflow connection
<a name="samples-secrets-manager"></a>

The following sample calls AWS Secrets Manager to get a secret key for an Apache Airflow connection on Amazon Managed Workflows for Apache Airflow. It assumes you've completed the steps in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

**Topics**
+ [Version](#samples-secrets-manager-version)
+ [Prerequisites](#samples-secrets-manager-prereqs)
+ [Permissions](#samples-secrets-manager-permissions)
+ [Requirements](#samples-hive-dependencies)
+ [Code sample](#samples-secrets-manager-code)
+ [What's next?](#samples-secrets-manager-next-up)

## Version
<a name="samples-secrets-manager-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-secrets-manager-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ The Secrets Manager backend as an Apache Airflow configuration option as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).
+ An Apache Airflow connection string in Secrets Manager as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

## Permissions
<a name="samples-secrets-manager-permissions"></a>
+ Secrets Manager permissions as listed in [Configuring an Apache Airflow connection using a AWS Secrets Manager secret](connections-secrets-manager.md).

## Requirements
<a name="samples-hive-dependencies"></a>

To use this code example with Apache Airflow v2 and later, no additional dependencies are required. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) to install Apache Airflow.

## Code sample
<a name="samples-secrets-manager-code"></a>

The following steps describe how to create the DAG code that calls Secrets Manager to get the secret.

1. In your command prompt, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `secrets-manager.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG, settings, secrets
   from airflow.operators.python import PythonOperator
   from airflow.utils.dates import days_ago
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   
   from datetime import timedelta
   import os
   
   ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html
   sm_secretId_name = 'airflow/connections/myconn'
   
   default_args = {
       'owner': 'airflow',
       'start_date': days_ago(1),
       'depends_on_past': False
   }
   
   
   ### Gets the secret myconn from Secrets Manager
   def read_from_aws_sm_fn(**kwargs):
       ### set up Secrets Manager
       hook = AwsBaseHook(client_type='secretsmanager')
       client = hook.get_client_type(region_name='us-east-1')
       response = client.get_secret_value(SecretId=sm_secretId_name)
       myConnSecretString = response["SecretString"]
   
       return myConnSecretString
   
   ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           dagrun_timeout=timedelta(hours=2),
           start_date=days_ago(1),
           schedule_interval=None
   ) as dag:
       write_all_to_aws_sm = PythonOperator(
           task_id="read_from_aws_sm",
           python_callable=read_from_aws_sm_fn,
           provide_context=True
       )
   ```

## What's next?
<a name="samples-secrets-manager-next-up"></a>
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).

# Creating a custom plugin with Oracle
<a name="samples-oracle"></a>

The following sample walks you through the steps to create a custom plugin using Oracle for Amazon MWAA and can be combined with other custom plugins and binaries in your plugins.zip file.

**Contents**
+ [Version](#samples-oracle-version)
+ [Prerequisites](#samples-oracle-prereqs)
+ [Permissions](#samples-oracle-permissions)
+ [Requirements](#samples-oracle-dependencies)
+ [Code sample](#samples-oracle-code)
+ [Create the custom plugin](#samples-oracle-create-pluginszip-steps)
  + [Download dependencies](#samples-oracle-install)
  + [Custom plugin](#samples-oracle-plugins-code)
  + [Plugins.zip](#samples-oracle-pluginszip)
+ [Airflow configuration options](#samples-oracle-airflow-config)
+ [What's next?](#samples-oracle-next-up)

## Version
<a name="samples-oracle-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-oracle-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).
+ Worker logging enabled at any log level, `CRITICAL` or in the previous section for your environment. For more information about Amazon MWAA log types and how to manage your log groups, refer to [Accessing Airflow logs in Amazon CloudWatch](monitoring-airflow.md)

## Permissions
<a name="samples-oracle-permissions"></a>

No additional permissions are required to use the code example on this page.

## Requirements
<a name="samples-oracle-dependencies"></a>

To use the sample code on this page, add the following dependencies to your `requirements.txt`. To learn more, refer to [Installing Python dependencies](working-dags-dependencies.md).

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
cx_Oracle
apache-airflow-providers-oracle
```

## Code sample
<a name="samples-oracle-code"></a>

The following steps describe how to create the DAG code that will test the custom plugin.

1. In your command prompt, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `oracle.py`.

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   import os
   import cx_Oracle
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   def testHook(**kwargs):
       cx_Oracle.init_oracle_client()
       version = cx_Oracle.clientversion()
       print("cx_Oracle.clientversion",version)
       return version
   
   with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hook_test = PythonOperator(
           task_id="hook_test",
           python_callable=testHook,
           provide_context=True 
       )
   ```

## Create the custom plugin
<a name="samples-oracle-create-pluginszip-steps"></a>

This section describes how to download the dependencies, create the custom plugin and the plugins.zip.

### Download dependencies
<a name="samples-oracle-install"></a>

Amazon MWAA will extract the contents of plugins.zip into `/usr/local/airflow/plugins` on each Amazon MWAA scheduler and worker container. This is used to add binaries to your environment. The following steps describe how to assemble the files needed for the custom plugin.

**Pull the Amazon Linux container image**

1. In your command prompt, pull the Amazon Linux container image, and run the container locally. For example:

   ```
   docker pull amazonlinux
   						docker run -it amazonlinux:latest /bin/bash
   ```

   Your command prompt can invoke a bash command line. For example:

   ```
   bash-4.2#
   ```

1. Install the Linux-native asynchronous I/O facility (libaio).

   ```
   yum -y install libaio
   ```

1. Keep this window open for subsequent steps. We'll be copying the following files locally: `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1`.

**Download client folder**

1. Install the unzip package locally. For example:

   ```
   sudo yum install unzip
   ```

1. Create an `oracle_plugin` directory. For example:

   ```
   mkdir oracle_plugin
   cd oracle_plugin
   ```

1. Use the following curl command to download the [instantclient-basic-linux.x64-18.5.0.0.0dbru.zip](https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip) from [Oracle Instant Client Downloads for Linux x86-64 (64-bit)](https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html).

   ```
   curl https://download.oracle.com/otn_software/linux/instantclient/185000/instantclient-basic-linux.x64-18.5.0.0.0dbru.zip > client.zip
   ```

1. Unzip the `client.zip` file. For example:

   ```
   unzip *.zip
   ```

**Extract files from Docker**

1. In a new command prompt, show and write down your Docker container ID. For example:

   ```
   docker container ls
   ```

   Your command prompt can return all containers and their IDs. For example:

   ```
   debc16fd6970
   ```

1. In your `oracle_plugin` directory, extract the `lib64/libaio.so.1`, `lib64/libaio.so.1.0.0`, `lib64/libaio.so.1.0.1` files to the local `instantclient_18_5` folder. For example:

   ```
   docker cp debc16fd6970:/lib64/libaio.so.1 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.0 instantclient_18_5/
   docker cp debc16fd6970:/lib64/libaio.so.1.0.1 instantclient_18_5/
   ```

### Custom plugin
<a name="samples-oracle-plugins-code"></a>

Apache Airflow will execute the contents of Python files in the plugins folder at startup. This is used to set and modify environment variables. The following steps describe the sample code for the custom plugin.
+ Copy the contents of the following code sample and save locally as `env_var_plugin_oracle.py`.

  ```
  from airflow.plugins_manager import AirflowPlugin
  import os
  
  os.environ["LD_LIBRARY_PATH"]='/usr/local/airflow/plugins/instantclient_18_5'
  os.environ["DPI_DEBUG_LEVEL"]="64"
  
  class EnvVarPlugin(AirflowPlugin):                
      name = 'env_var_plugin'
  ```

### Plugins.zip
<a name="samples-oracle-pluginszip"></a>

The following steps explain how to create the `plugins.zip`. The contents of this example can be combined with your other plugins and binaries into a single `plugins.zip` file.

**Zip the contents of the plugin directory**

1. In your command prompt, navigate to the `oracle_plugin` directory. For example:

   ```
   cd oracle_plugin
   ```

1. Zip the `instantclient_18_5` directory in plugins.zip. For example:

   ```
   zip -r ../plugins.zip ./
   ```

   Your command prompt displays:

   ```
   oracle_plugin$ ls
   client.zip		instantclient_18_5
   ```

1. Remove the `client.zip` file. For example:

   ```
   rm client.zip
   ```

**Zip the env\$1var\$1plugin\$1oracle.py file**

1. Add the `env_var_plugin_oracle.py` file to the root of the plugins.zip. For example:

   ```
   zip plugins.zip env_var_plugin_oracle.py
   ```

1. Your plugins.zip now includes the following:

   ```
   env_var_plugin_oracle.py
   instantclient_18_5/
   ```

## Airflow configuration options
<a name="samples-oracle-airflow-config"></a>

If you're using Apache Airflow v2, add `core.lazy_load_plugins : False` as an Apache Airflow configuration option. To learn more, refer to [Using configuration options to load plugins in 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## What's next?
<a name="samples-oracle-next-up"></a>
+ Learn how to upload the `requirements.txt` file in this example to your Amazon S3 bucket in [Installing Python dependencies](working-dags-dependencies.md).
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).
+ Learn more about how to upload the `plugins.zip` file in this example to your Amazon S3 bucket in [Installing custom plugins](configuring-dag-import-plugins.md).

# Changing a DAG's timezone on Amazon MWAA
<a name="samples-plugins-timezone"></a>

Apache Airflow schedules your directed acyclic graph (DAG) in UTC\$10 by default. The following steps present how you can change the timezone in which Amazon MWAA runs your DAGs with [Pendulum](https://pypi.org/project/pendulum/). Optionally, this topic demonstrates how you can create a custom plugin to change the timezone for your environment's Apache Airflow logs.

**Topics**
+ [Version](#samples-plugins-timezone-version)
+ [Prerequisites](#samples-plugins-timezone-prerequisites)
+ [Permissions](#samples-plugins-timezone-permissions)
+ [Create a plugin to change the timezone in Airflow logs](#samples-plugins-timezone-custom-plugin)
+ [Create a `plugins.zip`](#samples-plugins-timezone-plugins-zip)
+ [Code sample](#samples-plugins-timezone-dag)
+ [What's next?](#samples-plugins-timezone-plugins-next-up)

## Version
<a name="samples-plugins-timezone-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-plugins-timezone-prerequisites"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).

## Permissions
<a name="samples-plugins-timezone-permissions"></a>

No additional permissions are required to use the code example on this page.

## Create a plugin to change the timezone in Airflow logs
<a name="samples-plugins-timezone-custom-plugin"></a>

Apache Airflow run the Python files in the `plugins` directory at start-up. With the following plugin, you can override the executor's timezone, which modifies the timezone in which Apache Airflow writes logs.

1. Create a directory named `plugins` for your custom plugin, and navigate to the directory. For example:

   ```
   $ mkdir plugins
   $ cd plugins
   ```

1. Copy the contents of the following code sample and save locally as `dag-timezone-plugin.py` in the `plugins` folder.

   ```
   import time
   import os
   
   os.environ['TZ'] = 'America/Los_Angeles'
   time.tzset()
   ```

1. In the `plugins` directory, create an empty Python file named `__init__.py`. Your `plugins` directory should be similar to the following:

   ```
   plugins/
    |-- __init__.py
    |-- dag-timezone-plugin.py
   ```

## Create a `plugins.zip`
<a name="samples-plugins-timezone-plugins-zip"></a>

The following steps explain how to create `plugins.zip`. The content of this example can be combined with other plugins and binaries into a single `plugins.zip` file.

1. In your command prompt, navigate to the `plugins` directory from the previous step. For example:

   ```
   cd plugins
   ```

1. Zip the contents within your `plugins` directory.

   ```
   zip -r ../plugins.zip ./
   ```

1. Upload `plugins.zip` to your S3 bucket

   ```
   aws s3 cp plugins.zip s3://your-mwaa-bucket/
   ```

## Code sample
<a name="samples-plugins-timezone-dag"></a>

To change the default timezone (UTC\$10) in which the DAG runs, we'll use a library called [Pendulum](https://pypi.org/project/pendulum/), a Python library for working with timezone-aware datetime.

1. In your command prompt, navigate to the directory where your DAGs are stored. For example:

   ```
   cd dags
   ```

1. Copy the content of the following example and save as `tz-aware-dag.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   # Import the Pendulum library.
   import pendulum
   
   # Instantiate Pendulum and set your timezone.
   local_tz = pendulum.timezone("America/Los_Angeles")
   
   with DAG(
       dag_id = "tz_test",
       schedule_interval="0 12 * * *",
       catchup=False,
       start_date=datetime(2022, 1, 1, tzinfo=local_tz)
   ) as dag:
       bash_operator_task = BashOperator(
           task_id="tz_aware_task",
           dag=dag,
           bash_command="date"
       )
   ```

1.  Run the following AWS CLI command to copy the DAG to your environment's bucket, then trigger the DAG using the Apache Airflow UI. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. If successful, you'll output similar to the following in the task logs for the `tz_aware_task` in the `tz_test` DAG:

   ```
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
   [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

## What's next?
<a name="samples-plugins-timezone-plugins-next-up"></a>
+ Learn more about how to upload the `plugins.zip` file in this example to your Amazon S3 bucket in [Installing custom plugins](configuring-dag-import-plugins.md).

# Refreshing a CodeArtifact token
<a name="samples-code-artifact"></a>

If you're using CodeArtifact to install Python dependencies, Amazon MWAA requires an active token. To allow Amazon MWAA to access an CodeArtifact repository at runtime, you can use a [startup script](using-startup-script.md) and set the [https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url](https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-extra-index-url) with the token.

The following topic describes how you can create a startup script that uses the [https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/codeartifact.html#CodeArtifact.Client.get_authorization_token) CodeArtifact API operation to retrieve a fresh token every time your environment starts up, or updates.

**Topics**
+ [Version](#samples-code-artifact-version)
+ [Prerequisites](#samples-code-artifact-prereqs)
+ [Permissions](#samples-code-artifact-permissions)
+ [Code sample](#samples-code-artifact-code)
+ [What's next?](#samples-code-artifact-next-up)

## Version
<a name="samples-code-artifact-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-code-artifact-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).
+ A [CodeArtifact repository](https://docs.aws.amazon.com/codeartifact/latest/ug/create-repo.html) where you store dependencies for your environment.

## Permissions
<a name="samples-code-artifact-permissions"></a>

To refresh the CodeArtifact token and write the result to Amazon S3 Amazon MWAA must have the following permissions in the execution role.
+ The `codeartifact:GetAuthorizationToken` action allows Amazon MWAA to retrieve a new token from CodeArtifact. The following policy grants permission for every CodeArtifact domain you create. You can further restrict access to your domains by modifying the resource value in the statement, and specifying only the domains that you want your environment to access.

  ```
  {
    "Effect": "Allow",
    "Action": "codeartifact:GetAuthorizationToken",
    "Resource": "arn:aws:codeartifact:us-west-2:*:domain/*"
  }
  ```
+ The `sts:GetServiceBearerToken` action is required to call the CodeArtifact [https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html](https://docs.aws.amazon.com/codeartifact/latest/APIReference/API_GetAuthorizationToken.html) API operation. This operation returns a token that must be used when using a package manager such as `pip` with CodeArtifact. To use a package manager with a CodeArtifact repository, your environment's execution role role must allow `sts:GetServiceBearerToken` as listed in the following policy statement.

  ```
  {
    "Sid": "AllowServiceBearerToken",
    "Effect": "Allow",
    "Action": "sts:GetServiceBearerToken",
    "Resource": "*"
  }
  ```

## Code sample
<a name="samples-code-artifact-code"></a>

The following steps describe how you can create a start up script that updates the CodeArtifact token.

1. Copy the contents of the following code sample and save locally as `code_artifact_startup_script.sh`.

   ```
   #!/bin/sh
   
   # Startup script for MWAA, refer to https://docs.aws.amazon.com/mwaa/latest/userguide/using-startup-script.html
   
   set -eu
   
   # setup code artifact endpoint and token
   # https://pip.pypa.io/en/stable/cli/pip_install/#cmdoption-0
   # https://docs.aws.amazon.com/mwaa/latest/userguide/samples-code-artifact.html
   DOMAIN="amazon"
   DOMAIN_OWNER="112233445566"
   REGION="us-west-2"
   REPO_NAME="MyRepo"
   echo "Getting token for CodeArtifact with args: --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER"
   TOKEN=$(aws codeartifact get-authorization-token --domain $DOMAIN --region $REGION --domain-owner $DOMAIN_OWNER | jq -r '.authorizationToken')
   echo "Setting Pip env var for '--index-url' to point to CodeArtifact"
   export PIP_EXTRA_INDEX_URL="https://aws:$TOKEN@$DOMAIN-$DOMAIN_OWNER.d.codeartifact.$REGION.amazonaws.com/pypi/$REPO_NAME/simple/"
   echo "CodeArtifact startup setup complete"
   ```

1. Navigate to the folder where you saved the script. Use `cp` in a new prompt window to upload the script to your bucket. Replace *amzn-s3-demo-bucket* with your information.

   ```
   aws s3 cp code_artifact_startup_script.sh s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   If successful, Amazon S3 outputs the URL path to the object:

   ```
   upload: ./code_artifact_startup_script.sh to s3://amzn-s3-demo-bucket/code_artifact_startup_script.sh
   ```

   After you upload the script, your environment updates and runs the script at startup.

## What's next?
<a name="samples-code-artifact-next-up"></a>
+ Learn how to use startup scripts to customize your environment in [Using a startup script with Amazon MWAA](using-startup-script.md).
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).
+ Learn more about how to upload the `plugins.zip` file in this example to your Amazon S3 bucket in [Installing custom plugins](configuring-dag-import-plugins.md).

# Creating a custom plugin with Apache Hive and Hadoop
<a name="samples-hive"></a>

Amazon MWAA extracts the contents of a `plugins.zip` to `/usr/local/airflow/plugins`. This can be used to add binaries to your containers. In addition, Apache Airflow executes the contents of Python files in the `plugins` folder at *startup*—enabling you to set and modify environment variables. The following sample walks you through the steps to create a custom plugin using Apache Hive and Hadoop on an Amazon Managed Workflows for Apache Airflow environment and can be combined with other custom plugins and binaries.

**Topics**
+ [Version](#samples-hive-version)
+ [Prerequisites](#samples-hive-prereqs)
+ [Permissions](#samples-hive-permissions)
+ [Requirements](#samples-hive-dependencies)
+ [Download dependencies](#samples-hive-install)
+ [Custom plugin](#samples-hive-plugins-code)
+ [Plugins.zip](#samples-hive-pluginszip)
+ [Code sample](#samples-hive-code)
+ [Airflow configuration options](#samples-hive-airflow-config)
+ [What's next?](#samples-hive-next-up)

## Version
<a name="samples-hive-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-hive-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).

## Permissions
<a name="samples-hive-permissions"></a>

No additional permissions are required to use the code example on this page.

## Requirements
<a name="samples-hive-dependencies"></a>

To use the sample code on this page, add the following dependencies to your `requirements.txt`. To learn more, refer to [Installing Python dependencies](working-dags-dependencies.md).

```
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt
apache-airflow-providers-amazon[apache.hive]
```

## Download dependencies
<a name="samples-hive-install"></a>

Amazon MWAA will extract the contents of plugins.zip into `/usr/local/airflow/plugins` on each Amazon MWAA scheduler and worker container. This is used to add binaries to your environment. The following steps describe how to assemble the files needed for the custom plugin.

1. In your command prompt, navigate to the directory where you would like to create your plugin. For example:

   ```
   cd plugins
   ```

1. Download [Hadoop](https://hadoop.apache.org/) from a [mirror](https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz), for example:

   ```
   wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
   ```

1. Download [Hive](https://hive.apache.org/) from a [mirror](https://www.apache.org/dyn/closer.cgi/hive/), for example:

   ```
   wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
   ```

1. Create a directory. For example:

   ```
   mkdir hive_plugin
   ```

1. Extract Hadoop.

   ```
   tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
   ```

1. Extract Hive.

   ```
   tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin
   ```

## Custom plugin
<a name="samples-hive-plugins-code"></a>

Apache Airflow will execute the contents of Python files in the plugins folder at startup. This is used to set and modify environment variables. The following steps describe the sample code for the custom plugin.

1. In your command prompt, navigate to the `hive_plugin` directory. For example:

   ```
   cd hive_plugin
   ```

1. Copy the contents of the following code sample and save locally as `hive_plugin.py` in the `hive_plugin` directory.

   ```
   from airflow.plugins_manager import AirflowPlugin
   import os
   os.environ["JAVA_HOME"]="/usr/lib/jvm/jre"
   os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0'
   os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop'
   os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin'
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" 
   class EnvVarPlugin(AirflowPlugin):                
       name = 'hive_plugin'
   ```

1. Cope the content of the following text and save locally as `.airflowignore` in the `hive_plugin` directory.

   ```
   hadoop-3.3.0
   apache-hive-3.1.2-bin
   ```

## Plugins.zip
<a name="samples-hive-pluginszip"></a>

The following steps explain how to create `plugins.zip`. The contents of this example can be combined with other plugins and binaries into a single `plugins.zip` file.

1. In your command prompt, navigate to the `hive_plugin` directory from the previous step. For example:

   ```
   cd hive_plugin
   ```

1. Zip the contents within your `plugins` folder.

   ```
   zip -r ../hive_plugin.zip ./
   ```

## Code sample
<a name="samples-hive-code"></a>

The following steps describe how to create the DAG code that will test the custom plugin.

1. In your command prompt, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `hive.py`.

   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.utils.dates import days_ago
   
   with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       hive_test = BashOperator(
           task_id="hive_test",
           bash_command='hive --help'
       )
   ```

## Airflow configuration options
<a name="samples-hive-airflow-config"></a>

If you're using Apache Airflow v2, add `core.lazy_load_plugins : False` as an Apache Airflow configuration option. To learn more, refer to [Using configuration options to load plugins in 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## What's next?
<a name="samples-hive-next-up"></a>
+ Learn how to upload the `requirements.txt` file in this example to your Amazon S3 bucket in [Installing Python dependencies](working-dags-dependencies.md).
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).
+ Learn more about how to upload the `plugins.zip` file in this example to your Amazon S3 bucket in [Installing custom plugins](configuring-dag-import-plugins.md).

# Creating a custom plugin for Apache Airflow PythonVirtualenvOperator
<a name="samples-virtualenv"></a>

The following sample explains how to patch the Apache Airflow `PythonVirtualenvOperator` with a custom plugin on Amazon Managed Workflows for Apache Airflow.

**Topics**
+ [Version](#samples-virtualenv-version)
+ [Prerequisites](#samples-virtualenv-prereqs)
+ [Permissions](#samples-virtualenv-permissions)
+ [Requirements](#samples-virtualenv-dependencies)
+ [Custom plugin sample code](#samples-virtualenv-plugins-code)
+ [Plugins.zip](#samples-virtualenv-pluginszip)
+ [Code sample](#samples-virtualenv-code)
+ [Airflow configuration options](#samples-virtualenv-airflow-config)
+ [What's next?](#samples-virtualenv-next-up)

## Version
<a name="samples-virtualenv-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-virtualenv-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).

## Permissions
<a name="samples-virtualenv-permissions"></a>

No additional permissions are required to use the code example on this page.

## Requirements
<a name="samples-virtualenv-dependencies"></a>

To use the sample code on this page, add the following dependencies to your `requirements.txt`. To learn more, refer to [Installing Python dependencies](working-dags-dependencies.md).

```
virtualenv
```

## Custom plugin sample code
<a name="samples-virtualenv-plugins-code"></a>

Apache Airflow will execute the contents of Python files in the plugins folder at startup. This plugin will patch the built-in `PythonVirtualenvOperator` during that startup process to make it compatible with Amazon MWAA. The following steps display the sample code for the custom plugin.

1. In your command prompt, navigate to the `plugins` directory in the previous section. For example:

   ```
   cd plugins
   ```

1. Copy the contents of the following code sample and save locally as `virtual_python_plugin.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow.plugins_manager import AirflowPlugin
   import airflow.utils.python_virtualenv 
   from typing import List
   
   def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
       cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
       if system_site_packages:
           cmd.append('--system-site-packages')
       if python_bin is not None:
           cmd.append(f'--python={python_bin}')
       return cmd
   
   airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
   
   class VirtualPythonPlugin(AirflowPlugin):                
       name = 'virtual_python_plugin'
   ```

## Plugins.zip
<a name="samples-virtualenv-pluginszip"></a>

The following steps explain how to create the `plugins.zip`.

1. In your command prompt, navigate to the directory containing `virtual_python_plugin.py` in the previous section. For example:

   ```
   cd plugins
   ```

1. Zip the contents within your `plugins` folder.

   ```
   zip plugins.zip virtual_python_plugin.py
   ```

## Code sample
<a name="samples-virtualenv-code"></a>

The following steps describe how to create the DAG code for the custom plugin.

1. In your command prompt, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `virtualenv_test.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
    
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
    
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   from airflow import DAG
   from airflow.operators.python import PythonVirtualenvOperator
   from airflow.utils.dates import days_ago
   import os
   
   os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/bin"
   
   def virtualenv_fn():
       import boto3
       print("boto3 version ",boto3.__version__)
   
   with DAG(dag_id="virtualenv_test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
       virtualenv_task = PythonVirtualenvOperator(
           task_id="virtualenv_task",
           python_callable=virtualenv_fn,
           requirements=["boto3>=1.17.43"],
           system_site_packages=False,
           dag=dag,
       )
   ```

## Airflow configuration options
<a name="samples-virtualenv-airflow-config"></a>

If you're using Apache Airflow v2, add `core.lazy_load_plugins : False` as an Apache Airflow configuration option. To learn more, refer to [Using configuration options to load plugins in 2](configuring-env-variables.md#configuring-2.0-airflow-override).

## What's next?
<a name="samples-virtualenv-next-up"></a>
+ Learn how to upload the `requirements.txt` file in this example to your Amazon S3 bucket in [Installing Python dependencies](working-dags-dependencies.md).
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).
+ Learn more about how to upload the `plugins.zip` file in this example to your Amazon S3 bucket in [Installing custom plugins](configuring-dag-import-plugins.md).

# Invoking DAGs with a Lambda function
<a name="samples-lambda"></a>

The following code example uses an [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) function to get an Apache Airflow CLI token and invoke a directed acyclic graph (DAG) in an Amazon MWAA environment.

**Topics**
+ [Version](#samples-lambda-version)
+ [Prerequisites](#samples-lambda-prereqs)
+ [Permissions](#samples-lambda-permissions)
+ [Dependencies](#samples-lambda-dependencies)
+ [Code example](#samples-lambda-code)

## Version
<a name="samples-lambda-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-lambda-prereqs"></a>

To use this code example, you must:
+ Use the [public network access mode](configuring-networking.md#webserver-options-public-network-onconsole) for your [Amazon MWAA environment](get-started.md).
+ Have a [Lambda function](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html) using the latest Python runtime.

**Note**  
If the Lambda function and your Amazon MWAA environment are in the same VPC, you can use this code on a private network. For this configuration, the Lambda function's execution role needs permission to call the Amazon Elastic Compute Cloud (Amazon EC2) **CreateNetworkInterface** API operation. You can provide this permission using the [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS-managed policy.

## Permissions
<a name="samples-lambda-permissions"></a>

To use the code example on this page, your Amazon MWAA environment's execution role needs access to perform the `airflow:CreateCliToken` action. You can provide this permission using the `AmazonMWAAAirflowCliAccess` AWS-managed policy:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

For more information, refer to [Apache Airflow CLI policy: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access).

## Dependencies
<a name="samples-lambda-dependencies"></a>

To use this code example with Apache Airflow v2 and later, no additional dependencies are required. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) to install Apache Airflow.

## Code example
<a name="samples-lambda-code"></a>

1. Open the AWS Lambda console at [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/).

1. Choose your Lambda function from the **Functions** list.

1. On the function page, copy the following code and replace the following with the names of your resources:
   + `YOUR_ENVIRONMENT_NAME` – The name of your Amazon MWAA environment.
   + `YOUR_DAG_NAME` – The name of the DAG that you want to invoke.

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. Choose **Deploy**.

1. Choose **Test** to invoke your function using the Lambda console.

1. To verify that your Lambda successfully invoked your DAG, use the Amazon MWAA console to navigate to your environment's Apache Airflow UI, then do the following:

   1. On the **DAGs** page, locate your new target DAG in the list of DAGs.

   1. Under **Last Run**, check the timestamp for the latest DAG run. This timestamp should closely match the latest timestamp for `invoke_dag` in your other environment.

   1. Under **Recent Tasks**, check that the last run was successful.

# Invoking DAGs in different Amazon MWAA environments
<a name="samples-invoke-dag"></a>

The following code example creates an Apache Airflow CLI token. The code then uses a directed acyclic graph (DAG) in one Amazon MWAA environment to invoke a DAG in a different Amazon MWAA environment.

**Topics**
+ [Version](#samples-invoke-dag-version)
+ [Prerequisites](#samples-invoke-dag-prereqs)
+ [Permissions](#samples-invoke-dag-permissions)
+ [Dependencies](#samples-invoke-dag-dependencies)
+ [Code example](#samples-invoke-dag-code)

## Version
<a name="samples-invoke-dag-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-invoke-dag-prereqs"></a>

To use the code example on this page, you need the following:
+ Two [Amazon MWAA environments](get-started.md) with **public network** webserver access, including your current environment.
+ A sample DAG uploaded to your target environment's Amazon Simple Storage Service (Amazon S3) bucket.

## Permissions
<a name="samples-invoke-dag-permissions"></a>

To use the code example on this page, your environment's execution role must have permission to create an Apache Airflow CLI token. You can attach the AWS-managed policy `AmazonMWAAAirflowCliAccess` to grant this permission.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

For more information, refer to [Apache Airflow CLI policy: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access).

## Dependencies
<a name="samples-invoke-dag-dependencies"></a>

To use this code example with Apache Airflow v2 and later, no additional dependencies are required. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) to install Apache Airflow.

## Code example
<a name="samples-invoke-dag-code"></a>

The following code example assumes that you're using a DAG in your current environment to invoke a DAG in another environment.

1. In your terminal, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the content of the following code example and save it locally as `invoke_dag.py`. Replace the following values with your information.
   + `your-new-environment-name` – The name of the other environment where you want to invoke the DAG.
   + `your-target-dag-id` – The ID of the DAG in the other environment that you want to invoke.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Run the following AWS CLI command to copy the DAG to your environment's bucket, then trigger the DAG using the Apache Airflow UI. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. If the DAG runs successfully, you'll get output similar to the following in the task logs for `invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   To verify that your DAG was successfully invoked, navigate to the Apache Airflow UI for your new environment, then do the following:

   1. On the **DAGs** page, locate your new target DAG in the list of DAGs.

   1. Under **Last Run**, check the timestamp for the latest DAG run. This timestamp should closely match the latest timestamp for `invoke_dag` in your other environment.

   1. Under **Recent Tasks**, check that the last run was successful.

# Using Amazon MWAA with Amazon RDS for Microsoft SQL Server
<a name="samples-sql-server"></a>

You can use Amazon Managed Workflows for Apache Airflow to connect to an [RDS for SQL Server](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_SQLServer.html). The following sample code uses DAGs on an Amazon Managed Workflows for Apache Airflow environment to connect to and execute queries on an Amazon RDS for Microsoft SQL Server.

**Topics**
+ [Version](#samples-sql-server-version)
+ [Prerequisites](#samples-sql-server-prereqs)
+ [Dependencies](#samples-sql-server-dependencies)
+ [Apache Airflow v2 connection](#samples-sql-server-conn)
+ [Code sample](#samples-sql-server-code)
+ [What's next?](#samples-sql-server-next-up)

## Version
<a name="samples-sql-server-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-sql-server-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).
+ Amazon MWAA and the RDS for SQL Server are running in the same Amazon VPC/
+ VPC security groups of Amazon MWAA and the server are configured with the following connections:
  + An inbound rule for the port `1433` open for Amazon RDS in Amazon MWAA's security group
  + Or an outbound rule for the port of `1433` open from Amazon MWAA To RDS
+ Apache Airflow Connection for RDS for SQL Server reflects the hostname, port, username and password from the Amazon RDS SQL server database created in previous process.

## Dependencies
<a name="samples-sql-server-dependencies"></a>

To use the sample code in this section, add the following dependency to your `requirements.txt`. To learn more, refer to [Installing Python dependencies](working-dags-dependencies.md).

```
apache-airflow-providers-microsoft-mssql==1.0.1
			apache-airflow-providers-odbc==1.0.1
			pymssql==2.2.1
```

## Apache Airflow v2 connection
<a name="samples-sql-server-conn"></a>

If you're using a connection in Apache Airflow v2, ensure the Airflow connection object includes the following key-value pairs:

1. **Conn Id: ** mssql\$1default

1. **Conn Type: ** Amazon Web Services

1. **Host: ** `YOUR_DB_HOST`

1. **Schema: **

1. **Login: ** admin

1. **Password: **

1. **Port: ** 1433

1. **Extra: **

## Code sample
<a name="samples-sql-server-code"></a>

1. In your command prompt, navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `sql-server.py`.

   ```
   """
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   Permission is hereby granted, free of charge, to any person obtaining a copy of
   this software and associated documentation files (the "Software"), to deal in
   the Software without restriction, including without limitation the rights to
   use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
   the Software, and to permit persons to whom the Software is furnished to do so.
   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   """
   import pymssql
   import logging
   import sys
   from airflow import DAG
   from datetime import datetime
   from airflow.operators.mssql_operator import MsSqlOperator
   from airflow.operators.python_operator import PythonOperator
   
   default_args = {
       'owner': 'aws',
       'depends_on_past': False,
       'start_date': datetime(2019, 2, 20),
       'provide_context': True
   }
   
   dag = DAG(
       'mssql_conn_example', default_args=default_args, schedule_interval=None)
       
   drop_db = MsSqlOperator(
      task_id="drop_db",
      sql="DROP DATABASE IF EXISTS testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_db = MsSqlOperator(
      task_id="create_db",
      sql="create database testdb;",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   create_table = MsSqlOperator(
      task_id="create_table",
      sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   insert_into_table = MsSqlOperator(
      task_id="insert_into_table",
      sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');",
      mssql_conn_id="mssql_default",
      autocommit=True,
      dag=dag
   )
   
   def select_pet(**kwargs):
      try:
           conn = pymssql.connect(
               server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com',
               user='admin',
               password='<yoursupersecretpassword>',
               database='testdb'
           )
           
           # Create a cursor from the connection
           cursor = conn.cursor()
           cursor.execute("SELECT * from testdb.dbo.pet")
           row = cursor.fetchone()
           
           if row:
               print(row)
      except:
         logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0])
   
   select_query = PythonOperator(
       task_id='select_query',
       python_callable=select_pet,
       dag=dag,
   )
   
   drop_db >> create_db >> create_table >> insert_into_table >> select_query
   ```

## What's next?
<a name="samples-sql-server-next-up"></a>
+ Learn how to upload the `requirements.txt` file in this example to your Amazon S3 bucket in [Installing Python dependencies](working-dags-dependencies.md).
+ Learn how to upload the DAG code in this example to the `dags` folder in your Amazon S3 bucket in [Adding or updating DAGs](configuring-dag-folder.md).
+ Explore example scripts and other [pymssql module examples](https://pymssql.readthedocs.io/en/stable/pymssql_examples.html).
+ Learn more about executing SQL code in a specific Microsoft SQL database using the [mssql\$1operator](https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/mssql_operator/index.html?highlight=mssqloperator#airflow.operators.mssql_operator.MsSqlOperator) in the *Apache Airflow reference guide*.

# Using Amazon MWAA with Amazon EKS
<a name="mwaa-eks-example"></a>

The following sample demonstrates how to use Amazon Managed Workflows for Apache Airflow with Amazon EKS.

**Topics**
+ [Version](#mwaa-eks-example-version)
+ [Prerequisites](#eksctl-prereqs)
+ [Create a public key for Amazon EC2](#eksctl-create-key)
+ [Create the cluster](#create-cluster-eksctl)
+ [Create a `mwaa` namespace](#eksctl-namespace)
+ [Create a role for the `mwaa` namespace](#eksctl-role)
+ [Create and attach an IAM role for the Amazon EKS cluster](#eksctl-iam-role)
+ [Create the requirements.txt file](#eksctl-requirements)
+ [Create an identity mapping for Amazon EKS](#eksctl-identity-map)
+ [Create the `kubeconfig`](#eksctl-kube-config)
+ [Create a DAG](#eksctl-create-dag)
+ [Add the DAG and `kube_config.yaml` to the Amazon S3 bucket](#eksctl-dag-bucket)
+ [Enable and trigger the example](#eksctl-trigger-pod)

## Version
<a name="mwaa-eks-example-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="eksctl-prereqs"></a>

To use the example in this topic, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).
+ eksctl. To learn more, refer to [Install eksctl](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html#install-eksctl).
+ kubectl. To learn more, refer to [Install and Set Up kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/). In some case this is installed with eksctl.
+ An EC2 key pair in the Region where you create your Amazon MWAA environment. To learn more, refer to [Creating or importing a key pair](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#prepare-key-pair).

**Note**  
When you use an `eksctl` command, you can include a `--profile` to specify a profile other than the default.

## Create a public key for Amazon EC2
<a name="eksctl-create-key"></a>

Use the following command to create a public key from your private key pair.

```
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
```

To learn more, refer to [Retrieving the public key for your key pair](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#retrieving-the-public-key).

## Create the cluster
<a name="create-cluster-eksctl"></a>

Use the following command to create the cluster. If you want a custom name for the cluster or to create it in a different Region, replace the name and Region values. You must create the cluster in the same Region where you create the Amazon MWAA environment. Replace the values for the subnets to match the subnets in your Amazon VPC network that you use for Amazon MWAA. Replace the value for the `ssh-public-key` to match the key you use. You can use an existing key from Amazon EC2 that is in the same Region, or create a new key in the same Region where you create your Amazon MWAA environment.

```
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey \
--managed \
--vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \
--vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"
```

It takes some time to complete creating the cluster. Once complete, you can verify that the cluster was created successfully and has the IAM OIDC Provider configured by using the following command:

```
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
```

## Create a `mwaa` namespace
<a name="eksctl-namespace"></a>

After confirming that the cluster was successfully created, use the following command to create a namespace for the pods.

```
kubectl create namespace mwaa
```

## Create a role for the `mwaa` namespace
<a name="eksctl-role"></a>

After you create the namespace, create a role and role-binding for an Amazon MWAA user on EKS that can run pods in a the MWAA namespace. If you used a different name for the namespace, replace mwaa in `-n mwaa` with the name that you used.

```
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
  - apiGroups:
  - ""
  - "apps"
  - "batch"
  - "extensions"
resources:      
  - "jobs"
  - "pods"
  - "pods/attach"
			- "pods/exec"
  - "pods/log"
  - "pods/portforward"
  - "secrets"
  - "services"
verbs:
  - "create"
  - "delete"
  - "describe"
  - "get"
  - "list"
  - "patch"
  - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: mwaa-role-binding
  subjects:
    - kind: User
  name: mwaa-service
  roleRef:
    kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
```

Confirm that the new role can access the Amazon EKS cluster by running the following command. Be sure to use the correct name if you did not use *mwaa*:

```
kubectl get pods -n mwaa --as mwaa-service
```

You get a message returned that says:

```
No resources found in mwaa namespace.
```

## Create and attach an IAM role for the Amazon EKS cluster
<a name="eksctl-iam-role"></a>

You must create an IAM role and then bind it to the Amazon EKS (k8s) cluster so that it can be used for authentication through IAM. The role is used only to log in to the cluster, and does not have any permissions for the console or API calls.

Create a new role for the Amazon MWAA environment using the steps in [Amazon MWAA execution role](mwaa-create-role.md). However, instead of creating and attaching the policies described in that topic, attach the following policy:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:us-east-1:111122223333:environment/${MWAA_ENV_NAME}"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::{MWAA_S3_BUCKET}",
                "arn:aws:s3:::{MWAA_S3_BUCKET}/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults",
                "logs:DescribeLogGroups"
            ],
            "Resource": [
            "arn:aws:logs:us-east-1:111122223333:log-group:airflow-${MWAA_ENV_NAME}-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:us-east-1:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111122223333:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                    "sqs.us-east-1.amazonaws.com"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:us-east-1:111122223333:cluster/${EKS_CLUSTER_NAME}"
        }
    ]
}
```

------

After you create role, edit your Amazon MWAA environment to use the role you created as the execution role for the environment. To change the role, edit the environment to use. You select the execution role under **Permissions**.

**Known issues:**
+ There is a known issue with role ARNs with subpaths not being able to authenticate with Amazon EKS. The workaround for this is to create the service role manually rather than using the one created by Amazon MWAA itself. To learn more, refer to [Roles with paths do not work when the path is included in their ARN in the aws-auth configmap](https://github.com/kubernetes-sigs/aws-iam-authenticator/issues/268)
+ If Amazon MWAA service listing is not available in IAM you need to choose an alternate service policy, such as Amazon EC2, and then update the role’s trust policy to match the following:

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": [
            "airflow-env.amazonaws.com",
            "airflow.amazonaws.com"
          ]
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }
  ```

------

  To learn more, refer to [How to use trust policies with IAM roles](https://aws.amazon.com/blogs/security/how-to-use-trust-policies-with-iam-roles/).

## Create the requirements.txt file
<a name="eksctl-requirements"></a>

To use the sample code in this section, ensure you've added one of the following database options to your `requirements.txt`. To learn more, refer to [Installing Python dependencies](working-dags-dependencies.md).

```
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
```

## Create an identity mapping for Amazon EKS
<a name="eksctl-identity-map"></a>

Use the ARN for the role you created in the following command to create an identity mapping for Amazon EKS. Change the Region *us-east-1* to the Region where you created the environment. Replace the ARN for the role, and finally, replace *mwaa-execution-role* with your environment's execution role.

```
eksctl create iamidentitymapping \
--region us-east-1 \
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012:role/mwaa-execution-role \
--username mwaa-service
```

## Create the `kubeconfig`
<a name="eksctl-kube-config"></a>

Use the following command to create the `kubeconfig`:

```
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
```

If you used a specific profile when you ran `update-kubeconfig` you need to remove the `env:` section added to the kube\$1config.yaml file so that it works correctly with Amazon MWAA. To do so, delete the following from the file and then save it:

```
env:
 - name: AWS_PROFILE
 value: profile_name
```

## Create a DAG
<a name="eksctl-create-dag"></a>

Use the following code example to create a Python file, such as `mwaa_pod_example.py` for the DAG.

```
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
```

## Add the DAG and `kube_config.yaml` to the Amazon S3 bucket
<a name="eksctl-dag-bucket"></a>

Put the DAG you created and the `kube_config.yaml` file into the Amazon S3 bucket for the Amazon MWAA environment. You can put files into your bucket using either the Amazon S3 console or the AWS Command Line Interface.

## Enable and trigger the example
<a name="eksctl-trigger-pod"></a>

In Apache Airflow, enable the example and then trigger it.

After it runs and completes successfully, use the following command to verify the pod:

```
kubectl get pods -n mwaa
```

You get output similar to the following:

```
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
```

You can then verify the output of the pod with the following command. Replace the name value with the value returned from the previous command:

```
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888
```

# Connecting to Amazon ECS using the `ECSOperator`
<a name="samples-ecs-operator"></a>

This topic describes how you can use the `ECSOperator` to connect to an Amazon Elastic Container Service (Amazon ECS) container from Amazon MWAA. In the following steps, you'll add the required permissions to your environment's execution role, use a CloudFormation template to create an Amazon ECS Fargate cluster, and finally create and upload a DAG that connects to your new cluster.

**Topics**
+ [Version](#samples-ecs-operator-version)
+ [Prerequisites](#samples-ecs-operator-prereqs)
+ [Permissions](#samples-ecs-operator-permissions)
+ [Create an Amazon ECS cluster](#create-cfn-template)
+ [Code sample](#samples-ecs-operator-code)

## Version
<a name="samples-ecs-operator-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-ecs-operator-prereqs"></a>

To use the sample code on this page, you'll need the following:
+ An [Amazon MWAA environment](get-started.md).

## Permissions
<a name="samples-ecs-operator-permissions"></a>
+ The execution role for your environment needs permission to run tasks in Amazon ECS. You can either attach the [AmazonECS\$1FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonECS_FullAccess$jsonEditor) AWS-managed policy to your execution role, or create and attach the following policy to your execution role.

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "VisualEditor0",
              "Effect": "Allow",
              "Action": [
                  "ecs:RunTask",
                  "ecs:DescribeTasks"
              ],
              "Resource": "*"
          },
          {
              "Action": "iam:PassRole",
              "Effect": "Allow",
              "Resource": [
                  "*"
              ],
              "Condition": {
                  "StringLike": {
                      "iam:PassedToService": "ecs-tasks.amazonaws.com"
                  }
              }
          }
      ]
  }
  ```

------
+ In addition to adding the required premissions to run tasks in Amazon ECS, you must also modify the CloudWatch Logs policy statement in your Amazon MWAA execution role to allow access to the Amazon ECS task log group as listed in the following. The Amazon ECS log group is created by the CloudFormation template in [Create an Amazon ECS cluster](#create-cfn-template).

  ```
  {
    "Effect": "Allow",
    "Action": [
      "logs:CreateLogStream",
      "logs:CreateLogGroup",
      "logs:PutLogEvents",
      "logs:GetLogEvents",
      "logs:GetLogRecord",
      "logs:GetLogGroupFields",
      "logs:GetQueryResults"
    ],
    "Resource": [
      "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*",
      "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*"
    ]
  }
  ```

For more information about the Amazon MWAA execution role, and how to attach a policy, refer to [Execution role](mwaa-create-role.md).

## Create an Amazon ECS cluster
<a name="create-cfn-template"></a>

Using the following CloudFormation template, you will build an Amazon ECS Fargate cluster to use with your Amazon MWAA workflow. For more information, refer to [Creating a task definition](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-task-definition) in the *Amazon Elastic Container Service Developer Guide*.

1. Create a JSON file with the following code and save it as `ecs-mwaa-cfn.json`.

   ```
   {
       "AWSTemplateFormatVersion": "2010-09-09",
       "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.",
       "Parameters": {
           "VpcId": {
               "Type": "AWS::EC2::VPC::Id",
               "Description": "Select a VPC that allows instances access to ECR, as used with MWAA."
           },
           "SubnetIds": {
               "Type": "List<AWS::EC2::Subnet::Id>",
               "Description": "Select at two private subnets in your selected VPC, as used with MWAA."
           },
           "SecurityGroups": {
               "Type": "List<AWS::EC2::SecurityGroup::Id>",
               "Description": "Select at least one security group in your selected VPC, as used with MWAA."
           }
       },
       "Resources": {
           "Cluster": {
               "Type": "AWS::ECS::Cluster",
               "Properties": {
                   "ClusterName": {
                       "Fn::Sub": "${AWS::StackName}-cluster"
                   }
               }
           },
           "LogGroup": {
               "Type": "AWS::Logs::LogGroup",
               "Properties": {
                   "LogGroupName": {
                       "Ref": "AWS::StackName"
                   },
                   "RetentionInDays": 30
               }
           },
           "ExecutionRole": {
               "Type": "AWS::IAM::Role",
               "Properties": {
                   "AssumeRolePolicyDocument": {
                       "Statement": [
                           {
                               "Effect": "Allow",
                               "Principal": {
                                   "Service": "ecs-tasks.amazonaws.com"
                               },
                               "Action": "sts:AssumeRole"
                           }
                       ]
                   },
                   "ManagedPolicyArns": [
                       "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
                   ]
               }
           },
           "TaskDefinition": {
               "Type": "AWS::ECS::TaskDefinition",
               "Properties": {
                   "Family": {
                       "Fn::Sub": "${AWS::StackName}-task"
                   },
                   "Cpu": 2048,
                   "Memory": 4096,
                   "NetworkMode": "awsvpc",
                   "ExecutionRoleArn": {
                       "Ref": "ExecutionRole"
                   },
                   "ContainerDefinitions": [
                       {
                           "Name": {
                               "Fn::Sub": "${AWS::StackName}-container"
                           },
                           "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
                           "PortMappings": [
                               {
                                   "Protocol": "tcp",
                                   "ContainerPort": 8080,
                                   "HostPort": 8080
                               }
                           ],
                           "LogConfiguration": {
                               "LogDriver": "awslogs",
                               "Options": {
                                   "awslogs-region": {
                                       "Ref": "AWS::Region"
                                   },
                                   "awslogs-group": {
                                       "Ref": "LogGroup"
                                   },
                                   "awslogs-stream-prefix": "ecs"
                               }
                           }
                       }
                   ],
                   "RequiresCompatibilities": [
                       "FARGATE"
                   ]
               }
           },
           "Service": {
               "Type": "AWS::ECS::Service",
               "Properties": {
                   "ServiceName": {
                       "Fn::Sub": "${AWS::StackName}-service"
                   },
                   "Cluster": {
                       "Ref": "Cluster"
                   },
                   "TaskDefinition": {
                       "Ref": "TaskDefinition"
                   },
                   "DesiredCount": 1,
                   "LaunchType": "FARGATE",
                   "PlatformVersion": "1.3.0",
                   "NetworkConfiguration": {
                       "AwsvpcConfiguration": {
                           "AssignPublicIp": "ENABLED",
                           "Subnets": {
                               "Ref": "SubnetIds"
                           },
                           "SecurityGroups": {
                               "Ref": "SecurityGroups"
                           }
                       }
                   }
               }
           }
       }
   }
   ```

1. In your command prompt, use the following AWS CLI command to create a new stack. You must replace the values `SecurityGroups` and `SubnetIds` with values for your Amazon MWAA environment's security groups and subnets.

   ```
   aws cloudformation create-stack \
   --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \
   --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \
   ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \
   --capabilities CAPABILITY_IAM
   ```

   Alternatively, you can use the following shell script. The script retrieves the required values for your environment's security groups, and subnets using the `[get-environment](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/mwaa/get-environment.html)` AWS CLI command, then creates the stack accordingly. To run the script, do the following.

   1. Copy, and save the script as `ecs-stack-helper.sh` in the same directory as your CloudFormation template.

      ```
      #!/bin/bash
      
      joinByString() {
        local separator="$1"
        shift
        local first="$1"
        shift
        printf "%s" "$first" "${@/#/$separator}"
      }
      
      response=$(aws mwaa get-environment --name $1)
      
      securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]')
      subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]'))
      
      aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \
      --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \
      ParameterKey=SubnetIds,ParameterValue=$subnetIds \
      --capabilities CAPABILITY_IAM
      ```

   1. Run the script using the following commands. Replace `environment-name` and `stack-name` with your information.

      ```
      chmod +x ecs-stack-helper.sh
      ./ecs-stack-helper.bash environment-name stack-name
      ```

   If successful, you'll refer to the following output showing your new CloudFormation stack ID.

   ```
   {
     "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
   }
   ```

After your CloudFormation stack is completed and AWS has provisioned your Amazon ECS resources, you're ready to create and upload your DAG.

## Code sample
<a name="samples-ecs-operator-code"></a>

1. Open a command prompt, and navigate to the directory where your DAG code is stored. For example:

   ```
   cd dags
   ```

1. Copy the contents of the following code sample and save locally as `mwaa-ecs-operator.py`, then upload your new DAG to Amazon S3.

   ```
   from http import client
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.ecs import ECSOperator
   from airflow.utils.dates import days_ago
   import boto3
   
   CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information.
   CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information.
   LAUNCH_TYPE="FARGATE"
   
   with DAG(
       dag_id = "ecs_fargate_dag",
       schedule_interval=None,
       catchup=False,
       start_date=days_ago(1)
   ) as dag:
       client=boto3.client('ecs')
       services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE)
       service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns'])
   
       ecs_operator_task = ECSOperator(
           task_id = "ecs_operator_task",
           dag=dag,
           cluster=CLUSTER_NAME,
           task_definition=service['services'][0]['taskDefinition'],
           launch_type=LAUNCH_TYPE,
           overrides={
               "containerOverrides":[
                   {
                       "name":CONTAINER_NAME,
                       "command":["ls", "-l", "/"],
                   },
               ],
           },
   
           network_configuration=service['services'][0]['networkConfiguration'],
           awslogs_group="mwaa-ecs-zero",
           awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}",
       )
   ```
**Note**  
In the example DAG, for `awslogs_group`, you might need to modify the log group with the name for your Amazon ECS task log group. The example assumes a log group named `mwaa-ecs-zero`. For `awslogs_stream_prefix`, use the Amazon ECS task log stream prefix. The example assumes a log stream prefix, `ecs`.

1.  Run the following AWS CLI command to copy the DAG to your environment's bucket, then trigger the DAG using the Apache Airflow UI. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. If successful, you'll get output similar to the following in the task logs for `ecs_operator_task` in the `ecs_fargate_dag` DAG:

   ```
   [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
   Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
   [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
   {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
   [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
   [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
   .
   .
   .
   [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed
   ```

# Using dbt with Amazon MWAA
<a name="samples-dbt"></a>

This topic demonstrates how you can use dbt and Postgres with Amazon MWAA. In the following steps, you'll add the required dependencies to your `requirements.txt`, and upload a sample dbt project to your environment's Amazon S3 bucket. Then, you'll use a sample DAG to verify that Amazon MWAA has installed the dependencies, and finally use the `BashOperator` to run the dbt project.

**Topics**
+ [Version](#samples-dbt-version)
+ [Prerequisites](#samples-dbt-prereqs)
+ [Dependencies](#samples-dbt-dependencies)
+ [Upload a dbt project to Amazon S3](#samples-dbt-upload-project)
+ [Use a DAG to verify dbt dependency installation](#samples-dbt-test-dependencies)
+ [Use a DAG to run a dbt project](#samples-dbt-run-project)

## Version
<a name="samples-dbt-version"></a>

You can use the code example on this page with **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) and **Apache Airflow v3** in [Python 3.11](https://peps.python.org/pep-0664/).

## Prerequisites
<a name="samples-dbt-prereqs"></a>

Before you can complete the following steps, you'll need the following:
+ An [Amazon MWAA environment](get-started.md) using Apache Airflow v2.2.2. This sample was written, and tested with v2.2.2. You might need to modify the sample to use with other Apache Airflow versions.
+ A sample dbt project. To get started using dbt with Amazon MWAA, you can create a fork and clone the [dbt starter project](https://github.com/dbt-labs/dbt-starter-project) from the dbt-labs GitHub repository.

## Dependencies
<a name="samples-dbt-dependencies"></a>

To use Amazon MWAA with dbt, add the following startup script to your environment. To learn more, refer to [Using a startup script with Amazon MWAA](using-startup-script.md).

```
#!/bin/bash
			
  if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]]
    then
      exit 0
  fi
			
  echo "------------------------------"
  echo "Installing virtual Python env"
  echo "------------------------------"
			
  pip3 install --upgrade pip
			
  echo "Current Python version:"
  python3 --version 
  echo "..."
			
  sudo pip3 install --user virtualenv
  sudo mkdir python3-virtualenv
  cd python3-virtualenv
  sudo python3 -m venv dbt-env
  sudo chmod -R 777 *
			
  echo "------------------------------"
  echo "Activating venv in"
  $DBT_ENV_PATH
	  		echo "------------------------------"
			
  source dbt-env/bin/activate
  pip3 list
			
  echo "------------------------------"
  echo "Installing libraries..."
  echo "------------------------------"
			
  # do not use sudo, as it will install outside the venv
  pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1
			
  echo "------------------------------"
  echo "Venv libraries..."
  echo "------------------------------"
			
  pip3 list
  dbt --version
			
  echo "------------------------------"
  echo "Deactivating venv..."
  echo "------------------------------"
			
  deactivate
```

In the following sections, you'll upload your dbt project directory to Amazon S3 and run a DAG that validates whether Amazon MWAA has successfully installed the required dbt dependencies.

## Upload a dbt project to Amazon S3
<a name="samples-dbt-upload-project"></a>

To be able to use a dbt project with your Amazon MWAA environment, you can upload the entire project directory to your environment's `dags` folder. When the environment updates, Amazon MWAA downloads the dbt directory to the local `usr/local/airflow/dags/` folder.

**To upload a dbt project to Amazon S3**

1. Navigate to the directory where you cloned the dbt starter project.

1. Run the following Amazon S3 AWS CLI command to recursively copy the content of the project to your environment's `dags` folder using the `--recursive` parameter. The command creates a sub-directory called `dbt` that you can use for all of your dbt projects. If the sub-directory already exists, the project files are copied into the existing directory, and a new directory is not created. The command also creates a sub-directory within the `dbt` directory for this specific starter project.

   ```
   aws s3 cp dbt-starter-project s3://amzn-s3-demo-bucket/dags/dbt/dbt-starter-project --recursive
   ```

   You can use different names for project sub-directories to organize multiple dbt projects within the parent `dbt` directory.

## Use a DAG to verify dbt dependency installation
<a name="samples-dbt-test-dependencies"></a>

The following DAG uses a `BashOperator` and a bash command to verify whether Amazon MWAA has successfully installed the dbt dependencies specified in `requirements.txt`.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"
			)
```

Do the following to access task logs and verify that dbt and its dependencies have been installed.

1. Navigate to the Amazon MWAA console, then choose **Open Airflow UI** from the list of available environments.

1. On the Apache Airflow UI, find the `dbt-installation-test` DAG from the list, then choose the date in the `Last Run` column to open the last successful task.

1. Using **Graph View**, choose the `bash_command` task to open the task instance details.

1. Choose **Log** to open the task logs, then verify that the logs successfully list the dbt version we specified in `requirements.txt`.

## Use a DAG to run a dbt project
<a name="samples-dbt-run-project"></a>

The following DAG uses a `BashOperator` to copy the dbt projects you uploaded to Amazon S3 from the local `usr/local/airflow/dags/` directory to the write-accessible `/tmp` directory, then runs the dbt project. The bash commands assume a starter dbt project titled `dbt-starter-project`. Modify the directory name according to the name of your project directory.

```
from airflow import DAG
			from airflow.operators.bash_operator import BashOperator
			from airflow.utils.dates import days_ago
			
			import os
			
			DAG_ID = os.path.basename(__file__).replace(".py", "")
			
			# assumes all files are in a subfolder of DAGs called dbt
			
			with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
			cli_command = BashOperator(
			task_id="bash_command",
			bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\
			cp -R /usr/local/airflow/dags/dbt /tmp;\
			echo 'listing project files:';\
			ls -R /tmp;\
			cd /tmp/dbt/mwaa_dbt_test_project;\
			/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\
			cat /tmp/dbt_logs/dbt.log;\
			rm -rf /tmp/dbt/mwaa_dbt_test_project"
			)
```

## AWS blogs and tutorials
<a name="samples-blogs-tutorials"></a>
+ [Working with Amazon EKS and Amazon MWAA for Apache Airflow v2.x](https://dev.to/aws/working-with-amazon-eks-and-amazon-managed-workflows-for-apache-airflow-v2-x-k12)