Limpeza do SQL banco de dados Aurora Postgre em um ambiente Amazon MWAA - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Limpeza do SQL banco de dados Aurora Postgre em um ambiente Amazon MWAA

O Amazon Managed Workflows para Apache Airflow usa um banco de dados Aurora Postgre como SQL banco de dados de metadados do Apache Airflow, onde as execuções e as instâncias das tarefas são armazenadas. DAG O código de exemplo a seguir limpa periodicamente as entradas do banco de dados Aurora SQL Postgre dedicado para seu ambiente Amazon. MWAA

Version (Versão)

Pré-requisitos

Para usar o código de amostra nesta página, você precisará do seguinte:

Dependências

Exemplo de código

O seguinte DAG limpa o banco de dados de metadados das tabelas especificadas em. TABLES_TO_CLEAN O exemplo exclui dados das tabelas especificadas com mais de 30 dias. Para ajustar até que ponto as entradas são excluídas, MAX_AGE_IN_DAYS defina um valor diferente.

Apache Airflow v2.4 and later
from airflow import DAG from airflow.models.param import Param from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta # Note: Database commands may time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule_interval=None, catchup=False, start_date=days_ago(1), params={ "timestamp": Param( default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"), type="string", minLength=1, maxLength=255, ), } ) as dag: if TABLES_TO_CLEAN: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes" else: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes" cli_command = BashOperator( task_id="bash_command", bash_command=bash_command )
Apache Airflow v2.2 and earlier
from airflow import settings from airflow.utils.dates import days_ago from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom from airflow.decorators import dag, task from airflow.utils.dates import days_ago from time import sleep from airflow.version import version major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1]) if major_version >= 2 and minor_version >= 6: from airflow.jobs.job import Job else: # The BaseJob class was renamed as of Apache Airflow v2.6 from airflow.jobs.base_job import BaseJob as Job # Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database. MAX_AGE_IN_DAYS = 30 MIN_AGE_IN_DAYS = 0 DECREMENT = -7 # This is a list of (table, time) tuples. # table = the table to clean in the metadata database # time = the column in the table associated to the timestamp of an entry # or None if not applicable. TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat], [TaskInstance, TaskInstance.execution_date], [TaskReschedule, TaskReschedule.execution_date], [DagTag, None], [DagModel, DagModel.last_parsed_time], [DagRun, DagRun.execution_date], [ImportError, ImportError.timestamp], [Log, Log.dttm], [SlaMiss, SlaMiss.execution_date], [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], [XCom, XCom.execution_date], ] @task() def cleanup_db_fn(x): session = settings.Session() if x[1]: for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT): earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS) print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...") earliest_date = days_ago(earliest_days_ago) oldest_date = days_ago(oldest_days_ago) query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date) query.delete(synchronize_session= False) session.commit() sleep(5) else: # No time column specified for the table. Delete all entries print("deleting", str(x[0]), "...") query = session.query(x[0]) query.delete(synchronize_session= False) session.commit() session.close() @dag( dag_id="cleanup_db", schedule_interval="@weekly", start_date=days_ago(7), catchup=False, is_paused_upon_creation=False ) def clean_db_dag_fn(): t_last=None for x in TABLES_TO_CLEAN: t=cleanup_db_fn(x) if t_last: t_last >> t t_last = t clean_db_dag = clean_db_dag_fn()