將環境中繼資料匯出到 Amazon S3 上的CSV檔案 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將環境中繼資料匯出到 Amazon S3 上的CSV檔案

下列程式碼範例顯示如何建立直接無環圖 (DAG),以查詢資料庫中的一系列DAG執行資訊,並將資料寫入 Amazon S3 上存放的.csv檔案。

您可能想要從環境的 Aurora Postgre 資料SQL庫匯出資訊,以便在本機檢查資料、將資料存檔到物件儲存中,或將它們與 Amazon S3 等工具結合到 Amazon Redshift 操作員和資料庫清理,以便將 Amazon 中MWAA繼資料移出環境,但保留它們以供將 future 分析之用。

您可以在資料庫中查詢 Apache 氣流模型中列出的任何物件。此程式碼範例使用三個模型、DagRun、和 TaskFailTaskInstance,這些模型提供與DAG執行相關的資訊。

版本

  • 您可以使用此頁面上的代碼示例與 Apache 氣流 V2Python 3.10

必要條件

若要使用此頁面上的範例程式碼,您需要下列項目:

許可

亞馬遜MWAA需要 Amazon S3 動作的許可,才s3:PutObject能將查詢的中繼資料資訊寫入 Amazon S3。將下列原則陳述式新增至環境的執行角色。

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

此原則將寫入存取權限限制為僅限 your-new-export-bucket.

要求

  • 若要將此程式碼範例與 Apache Airflow v2 搭配使用,不需要額外的相依性。該代碼使用 Apache 氣流 v2 基本安裝在您的環境。

範例程式碼

下列步驟說明如何建立DAG查詢 Aurora Postgre SQL 並將結果寫入新的 Amazon S3 儲存貯體。

  1. 在終端機中,瀏覽至儲存DAG程式碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並將其儲存為本機metadata_to_csv.py。您可以變更指派給的值,MAX_AGE_IN_DAYS以控制中繼資料資料庫中DAG查詢之最舊記錄的存留時間。

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. 執行下列 AWS CLI 命令以複製DAG到您環境的值區,然後DAG使用 Apache Airflow UI 觸發。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您將在任務的任務日誌中輸出類似以下內export_db容:

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    您現在可以在中存取和下載新 Amazon S3 儲存貯體中匯出的.csv檔案/files/export/