As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Como exportar metadados do ambiente para arquivos CSV no Amazon S3
O exemplo de código a seguir mostra como é possível criar um gráfico acíclico direcionado (DAG) que consulta o banco de dados em busca de uma variedade de informações de execução do DAG e grava os dados para arquivos .csv
armazenados no Amazon S3.
Talvez você queira exportar informações do banco de dados Aurora PostgreSQL do seu ambiente para inspecionar os dados localmente, arquivá-los no armazenamento de objetos ou combiná-los com ferramentas como o operador Amazon S3 para Amazon Redshift
É possível consultar o banco de dados para qualquer um dos objetos listados nos Modelos do Apache AirflowDagRun
, TaskFail
e TaskInstance
, que fornecem informações relevantes para executar o DAG.
Version (Versão)
-
É possível usar o exemplo de código nesta página com o Apache Airflow v2 no Python 3.10
.
Pré-requisitos
Para usar o código de amostra nesta página, você precisará do seguinte:
-
Um novo bucket do Amazon S3 no qual você deseja exportar suas informações de metadados.
Permissões
O Amazon MWAA precisa de permissão para que a ação s3:PutObject
do Amazon S3 grave as informações de metadados consultadas no Amazon S3. Adicione a seguinte declaração de política ao perfil de execução do seu ambiente.
{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::
your-new-export-bucket
" }
Essa política limita o acesso de gravação somente ao your-new-export-bucket
.
Requisitos
-
Para usar esse exemplo de código com o Apache Airflow v2, nenhuma dependência adicional é necessária. O código usa a instalação básica do Apache Airflow v2
em seu ambiente.
Exemplo de código
As etapas a seguir descrevem como é possível criar um DAG que consulta o Aurora PostgreSQL e grava o resultado em seu novo bucket do Amazon S3.
-
Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:
cd dags
-
Copie o conteúdo do exemplo de código a seguir e salve-o localmente como
metadata_to_csv.py
. É possível alterar o valor atribuído paraMAX_AGE_IN_DAYS
para controlar a idade dos registros mais antigos que seu DAG consulta no banco de dados de metadados.from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
-
Execute o seguinte comando AWS CLI para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a IU do Apache Airflow.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
Se for bem-sucedido, você exibirá uma saída semelhante à seguinte nos logs de tarefas da tarefa
export_db
:[2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [
your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule checkAgora é possível acessar e baixar os arquivos
.csv
exportados em seu novo bucket do Amazon S3 em/files/export/
.