Como exportar metadados do ambiente para arquivos CSV no Amazon S3 - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Como exportar metadados do ambiente para arquivos CSV no Amazon S3

O exemplo de código a seguir mostra como é possível criar um gráfico acíclico direcionado (DAG) que consulta o banco de dados em busca de uma variedade de informações de execução do DAG e grava os dados para arquivos .csv armazenados no Amazon S3.

Talvez você queira exportar informações do banco de dados Aurora PostgreSQL do seu ambiente para inspecionar os dados localmente, arquivá-los no armazenamento de objetos ou combiná-los com ferramentas como o operador Amazon S3 para Amazon Redshift e a limpeza do banco de dados, a fim de mover os metadados do Amazon MWAA do ambiente, mas preservá-los para análise futura.

É possível consultar o banco de dados para qualquer um dos objetos listados nos Modelos do Apache Airflow. Esse exemplo de código usa três modelos, DagRun, TaskFail e TaskInstance, que fornecem informações relevantes para executar o DAG.

Version (Versão)

  • É possível usar o exemplo de código nesta página com o Apache Airflow v2 no Python 3.10.

Pré-requisitos

Para usar o código de amostra nesta página, você precisará do seguinte:

Permissões

O Amazon MWAA precisa de permissão para que a ação s3:PutObject do Amazon S3 grave as informações de metadados consultadas no Amazon S3. Adicione a seguinte declaração de política ao perfil de execução do seu ambiente.

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

Essa política limita o acesso de gravação somente ao your-new-export-bucket.

Requisitos

Exemplo de código

As etapas a seguir descrevem como é possível criar um DAG que consulta o Aurora PostgreSQL e grava o resultado em seu novo bucket do Amazon S3.

  1. Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

    cd dags
  2. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como metadata_to_csv.py. É possível alterar o valor atribuído para MAX_AGE_IN_DAYS para controlar a idade dos registros mais antigos que seu DAG consulta no banco de dados de metadados.

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. Execute o seguinte comando AWS CLI para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a IU do Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Se for bem-sucedido, você exibirá uma saída semelhante à seguinte nos logs de tarefas da tarefa export_db:

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Agora é possível acessar e baixar os arquivos .csv exportados em seu novo bucket do Amazon S3 em /files/export/.