Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Exportación de metadatos del entorno a archivos CSV en Amazon S3
El siguiente código de muestra indica cómo puede crear un gráfico acíclico dirigido (DAG) que consulte en la base de datos un rango de información de ejecución del DAG y escriba los datos en los archivos .csv
almacenados en Amazon S3.
Quizás desee exportar información de la base de datos Aurora PostgreSQL de su entorno para inspeccionar los datos localmente, archivarlos en un almacenamiento de objetos o combinarlos con herramientas como el operador Amazon S3 a Amazon Redshift
Puede consultar en la base de datos cualquiera de los objetos enumerados en los modelos de Apache Airflow.DagRun
, TaskFail
y TaskInstance
, que proporcionan información relevante para las ejecuciones del DAG.
Versión
-
Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10
.
Requisitos previos
Para usar el código de muestra de esta página, necesitará lo siguiente:
-
Un nuevo bucket de Amazon S3 a donde desea exportar su información de metadatos.
Permisos
Amazon MWAA necesita permiso para que la acción s3:PutObject
de Amazon S3 escriba la información de metadatos consultada en Amazon S3. Añada la siguiente instrucción de política al rol de ejecución de su entorno.
{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::
your-new-export-bucket
" }
Esta política limita el acceso por escrito únicamente ayour-new-export-bucket
.
Requisitos
-
Para usar este código de ejemplo con Apache Airflow v2, no se necesitan dependencias adicionales. El código utiliza la instalación básica de Apache Airflow v2
en su entorno.
Código de ejemplo
Los siguientes pasos describen cómo puede crear un DAG que consulte Aurora PostgreSQL y escriba el resultado en su nuevo bucket de Amazon S3.
-
En su terminal, vaya hasta el directorio en el que está almacenado el código de DAG. Por ejemplo:
cd dags
-
Copie el contenido del siguiente código de ejemplo y guárdelo localmente como
metadata_to_csv.py
. Puede cambiar el valor asignadoMAX_AGE_IN_DAYS
para controlar la antigüedad de los registros más antiguos que el DAG consulta en la base de datos de metadatos.from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
-
Ejecute el siguiente AWS CLI comando para copiar el DAG al bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas para la tarea
export_db
:[2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [
your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule checkAhora puede acceder a los archivos
.csv
exportados y descargarlos en su nuevo bucket de Amazon S3 en/files/export/
.