from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Note: Database commands may time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
params={
"timestamp": Param(
default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
type="string",
minLength=1,
maxLength=255,
),
}
) as dag:
if TABLES_TO_CLEAN:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
else:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command
)
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep
from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
from airflow.jobs.job import Job
else:
# The BaseJob class was renamed as of Apache Airflow v2.6
from airflow.jobs.base_job import BaseJob as Job
# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7
# This is a list of (table, time) tuples.
# table = the table to clean in the metadata database
# time = the column in the table associated to the timestamp of an entry
# or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
[TaskInstance, TaskInstance.execution_date],
[TaskReschedule, TaskReschedule.execution_date],
[DagTag, None],
[DagModel, DagModel.last_parsed_time],
[DagRun, DagRun.execution_date],
[ImportError, ImportError.timestamp],
[Log, Log.dttm],
[SlaMiss, SlaMiss.execution_date],
[RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date],
[XCom, XCom.execution_date],
]
@task()
def cleanup_db_fn(x):
session = settings.Session()
if x[1]:
for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
earliest_date = days_ago(earliest_days_ago)
oldest_date = days_ago(oldest_days_ago)
query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
query.delete(synchronize_session= False)
session.commit()
sleep(5)
else:
# No time column specified for the table. Delete all entries
print("deleting", str(x[0]), "...")
query = session.query(x[0])
query.delete(synchronize_session= False)
session.commit()
session.close()
@dag(
dag_id="cleanup_db",
schedule_interval="@weekly",
start_date=days_ago(7),
catchup=False,
is_paused_upon_creation=False
)
def clean_db_dag_fn():
t_last=None
for x in TABLES_TO_CLEAN:
t=cleanup_db_fn(x)
if t_last:
t_last >> t
t_last = t
clean_db_dag = clean_db_dag_fn()