Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Exportation de métadonnées d'environnement vers CSV des fichiers sur Amazon S3
L'exemple de code suivant montre comment créer un graphe acyclique dirigé (DAG) qui interroge la base de données pour obtenir une série d'informations d'DAGexécution et écrit les données dans des .csv
fichiers stockés sur Amazon S3.
Vous souhaiterez peut-être exporter des informations depuis la SQL base de données Aurora Postgre de votre environnement afin d'inspecter les données localement, de les archiver dans un espace de stockage d'objets ou de les combiner avec des outils tels que l'opérateur Amazon S3 vers Amazon Redshift
Vous pouvez interroger la base de données pour n'importe quel objet répertorié dans les modèles Apache AirflowDagRun
,TaskFail
, etTaskInstance
, qui fournissent des informations relatives aux DAG exécutions.
Version
-
Vous pouvez utiliser l'exemple de code présenté sur cette page avec Apache Airflow v2 en Python 3.10
.
Prérequis
Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
-
Un nouveau compartiment Amazon S3 dans lequel vous souhaitez exporter vos informations de métadonnées.
Autorisations
Amazon MWAA a besoin d'une autorisation pour que l'action s3:PutObject
Amazon S3 puisse écrire les informations de métadonnées demandées dans Amazon S3. Ajoutez la déclaration de politique suivante au rôle d'exécution de votre environnement.
{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::
your-new-export-bucket
" }
Cette politique limite l'accès en écriture uniquement aux your-new-export-bucket
.
Prérequis
-
Pour utiliser cet exemple de code avec Apache Airflow v2, aucune dépendance supplémentaire n'est requise. Le code utilise l'installation de base d'Apache Airflow v2
sur votre environnement.
Exemple de code
Les étapes suivantes décrivent comment créer un fichier DAG qui interroge le Postgre Aurora SQL et écrit le résultat dans votre nouveau compartiment Amazon S3.
-
Dans votre terminal, accédez au répertoire dans lequel votre DAG code est enregistré. Par exemple :
cd dags
-
Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le nom
metadata_to_csv.py
. Vous pouvez modifier la valeur attribuéeMAX_AGE_IN_DAYS
à pour contrôler l'âge des enregistrements les plus anciens que vos DAG requêtes proviennent de la base de données de métadonnées.from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
-
Exécutez la AWS CLI commande suivante pour le copier dans le compartiment DAG de votre environnement, puis déclenchez-le à l'DAGaide de l'interface utilisateur d'Apache Airflow.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
En cas de succès, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches associées à la
export_db
tâche :[2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [
your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance : [your-tasks
] [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule checkVous pouvez désormais accéder aux
.csv
fichiers exportés et les télécharger dans votre nouveau compartiment Amazon S3 dans/files/export/
.