本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将环境元数据导出到 Amazon S3 上的 CSV 文件
以下代码示例说明如何创建有向无环图(DAG),该图在数据库中查询一系列 DAG 运行信息,并将数据写入存储在 Amazon S3 上的 .csv
文件。
您可能需要从环境的 Aurora PostgreSQL 数据库中导出信息,以便在本地检查数据,将其存档到对象存储中,或者将它们与诸如 Amazon S3 到 Amazon Redshift 运算符
您可以在数据库中查询 Apache Airflow 模型DagRun
、TaskFail
和 TaskInstance
,它们提供与 DAG 运行相关的信息。
版本
-
您可以在 Python 3.10
中将本页上的代码示例与 Apache Airflow v2 一起使用。
先决条件
要使用本页上的示例代码,您需要以下内容:
-
您想要将元数据导出到新的 Amazon S3 存储桶。
权限
Amazon MWAA 需要获得 Amazon S3 操作 s3:PutObject
的权限,才能将查询的元数据信息写入 Amazon S3。将以下策略声明添加到环境的执行角色中。
{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::
your-new-export-bucket
" }
此策略将写入权限仅限于 your-new-export-bucket
。
要求
-
要在 Apache Airflow v2 中使用此代码示例,无需附加依赖项。该代码在环境中使用 Apache Airflow v2 基础版安装
。
代码示例
以下步骤描述了如何创建 DAG,以查询 Aurora PostgreSQL 并将结果写入新的 Amazon S3 存储桶。
-
在您的终端,导航到存储 DAG 代码的目录。例如:
cd dags
-
复制以下代码示例的内容并本地另存为
metadata_to_csv.py
。您可以更改分配给MAX_AGE_IN_DAYS
的值,以控制 DAG 从元数据数据库中查询的最早记录的龄期。from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
-
运行以下 AWS CLI 命令将 DAG 复制到环境的存储桶,然后使用 Apache Airflow UI 触发 DAG。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
如果成功,输出将类似于在
export_db
任务的任务日志中的以下内容:[2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [
your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check现在,您可以在
/files/export/
中的新 Amazon S3 存储桶中访问和下载导出的.csv
文件。