View a markdown version of this page

Aurora PostgreSQL database cleanup on an Amazon MWAA environment - Amazon Managed Workflows for Apache Airflow

Aurora PostgreSQL database cleanup on an Amazon MWAA environment

Amazon Managed Workflows for Apache Airflow uses an Aurora PostgreSQL database as the Apache Airflow metadata database, where DAG runs and task instances are stored. The following sample code periodically clears out entries from the dedicated Aurora PostgreSQL database for your Amazon MWAA environment.

Version

The code samples on this page are specific to Apache Airflow v2 and v3 supported on Amazon MWAA. Refer to the supported Apache Airflow versions.

Prerequisites

To use the sample code on this page, you'll need the following:

Dependencies

To use this code example with Apache Airflow v2, no additional dependencies are required. Use aws-mwaa-docker-images to install Apache Airflow.

Code sample

The following DAG cleans the metadata database for the tables specified in TABLES_TO_CLEAN. The example deletes data from the specified tables that is older than 30 days. To adjust how far back the entries are deleted, set MAX_AGE_IN_DAYS to a different value.

Apache Airflow v3.0.6 to 3.2.1
from datetime import datetime from airflow import DAG from airflow.providers.standard.operators.bash import BashOperator # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule=None, catchup=False, start_date=datetime(2026, 1, 1), ) as dag: tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else "" bash_command = ( f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null " f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && " "echo \"Cleaning records before: $TIMESTAMP\" && " "airflow db clean " "--clean-before-timestamp \"$TIMESTAMP\" " f"{tables_flag}" "--skip-archive --yes" ) cli_command = BashOperator( task_id="bash_command", bash_command=bash_command, )
Apache Airflow v2.7.2 to 2.11.0
from airflow import DAG from airflow.models.param import Param from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule_interval=None, catchup=False, start_date=days_ago(1), params={ "timestamp": Param( default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"), type="string", minLength=1, maxLength=255, ), } ) as dag: if TABLES_TO_CLEAN: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes" else: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes" cli_command = BashOperator( task_id="bash_command", bash_command=bash_command )