Exportation de métadonnées d'environnement vers CSV des fichiers sur Amazon S3 - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exportation de métadonnées d'environnement vers CSV des fichiers sur Amazon S3

L'exemple de code suivant montre comment créer un graphe acyclique dirigé (DAG) qui interroge la base de données pour obtenir une série d'informations d'DAGexécution et écrit les données dans des .csv fichiers stockés sur Amazon S3.

Vous souhaiterez peut-être exporter des informations depuis la SQL base de données Aurora Postgre de votre environnement afin d'inspecter les données localement, de les archiver dans un espace de stockage d'objets ou de les combiner avec des outils tels que l'opérateur Amazon S3 vers Amazon Redshift et le nettoyage de la base de données, afin de déplacer les métadonnées MWAA Amazon hors de l'environnement, tout en les préservant pour une analyse future.

Vous pouvez interroger la base de données pour n'importe quel objet répertorié dans les modèles Apache Airflow. Cet exemple de code utilise trois modèlesDagRun,TaskFail, etTaskInstance, qui fournissent des informations relatives aux DAG exécutions.

Version

  • Vous pouvez utiliser l'exemple de code présenté sur cette page avec Apache Airflow v2 en Python 3.10.

Prérequis

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :

Autorisations

Amazon MWAA a besoin d'une autorisation pour que l'action s3:PutObject Amazon S3 puisse écrire les informations de métadonnées demandées dans Amazon S3. Ajoutez la déclaration de politique suivante au rôle d'exécution de votre environnement.

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::your-new-export-bucket" }

Cette politique limite l'accès en écriture uniquement aux your-new-export-bucket.

Prérequis

Exemple de code

Les étapes suivantes décrivent comment créer un fichier DAG qui interroge le Postgre Aurora SQL et écrit le résultat dans votre nouveau compartiment Amazon S3.

  1. Dans votre terminal, accédez au répertoire dans lequel votre DAG code est enregistré. Par exemple :

    cd dags
  2. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le nommetadata_to_csv.py. Vous pouvez modifier la valeur attribuée MAX_AGE_IN_DAYS à pour contrôler l'âge des enregistrements les plus anciens que vos DAG requêtes proviennent de la base de données de métadonnées.

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. Exécutez la AWS CLI commande suivante pour le copier dans le compartiment DAG de votre environnement, puis déclenchez-le à l'DAGaide de l'interface utilisateur d'Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. En cas de succès, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches associées à la export_db tâche :

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Vous pouvez désormais accéder aux .csv fichiers exportés et les télécharger dans votre nouveau compartiment Amazon S3 dans/files/export/.