Amazon MWAA 環境での Aurora PostgreSQL データベースのクリーンアップ - Amazon Managed Workflows for Apache Airflow

Amazon MWAA 環境での Aurora PostgreSQL データベースのクリーンアップ

Amazon Managed Workflows for Apache Airflow は、Apache Airflow メタデータデータベースとして Aurora PostgreSQL データベースを使用します。このメタデータデータベースには DAG が実行され、タスクインスタンスが保存されます。次のサンプルコードは、Amazon MWAA 環境の専用の Aurora PostgreSQL データベースから定期的にエントリを消去します。

Version

  • このページのコード例は、Python 3.10Apache Airflow v2 と共に使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

依存関係

コードサンプル

次の DAG は、TABLES_TO_CLEAN で指定されたテーブルのメタデータデータベースをクリーンアップします。この例では、30 日以上経過した指定されたテーブルからデータを削除します。エントリが削除される距離を調整するには、MAX_AGE_IN_DAYS を別の値に設定します。

Apache Airflow v2.4 and later
from airflow import DAG from airflow.models.param import Param from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta # Note: Database commands may time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule_interval=None, catchup=False, start_date=days_ago(1), params={ "timestamp": Param( default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"), type="string", minLength=1, maxLength=255, ), } ) as dag: if TABLES_TO_CLEAN: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes" else: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes" cli_command = BashOperator( task_id="bash_command", bash_command=bash_command )
Apache Airflow v2.2 and earlier
from airflow import settings from airflow.utils.dates import days_ago from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom from airflow.decorators import dag, task from airflow.utils.dates import days_ago from time import sleep from airflow.version import version major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1]) if major_version >= 2 and minor_version >= 6: from airflow.jobs.job import Job else: # The BaseJob class was renamed as of Apache Airflow v2.6 from airflow.jobs.base_job import BaseJob as Job # Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database. MAX_AGE_IN_DAYS = 30 MIN_AGE_IN_DAYS = 0 DECREMENT = -7 # This is a list of (table, time) tuples. # table = the table to clean in the metadata database # time = the column in the table associated to the timestamp of an entry # or None if not applicable. TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat], [TaskInstance, TaskInstance.execution_date], [TaskReschedule, TaskReschedule.execution_date], [DagTag, None], [DagModel, DagModel.last_parsed_time], [DagRun, DagRun.execution_date], [ImportError, ImportError.timestamp], [Log, Log.dttm], [SlaMiss, SlaMiss.execution_date], [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], [XCom, XCom.execution_date], ] @task() def cleanup_db_fn(x): session = settings.Session() if x[1]: for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT): earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS) print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...") earliest_date = days_ago(earliest_days_ago) oldest_date = days_ago(oldest_days_ago) query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date) query.delete(synchronize_session= False) session.commit() sleep(5) else: # No time column specified for the table. Delete all entries print("deleting", str(x[0]), "...") query = session.query(x[0]) query.delete(synchronize_session= False) session.commit() session.close() @dag( dag_id="cleanup_db", schedule_interval="@weekly", start_date=days_ago(7), catchup=False, is_paused_upon_creation=False ) def clean_db_dag_fn(): t_last=None for x in TABLES_TO_CLEAN: t=cleanup_db_fn(x) if t_last: t_last >> t t_last = t clean_db_dag = clean_db_dag_fn()