Uso de dbt con Amazon MWAA - Amazon Managed Workflows para Apache Airflow

Uso de dbt con Amazon MWAA

En este tema se muestra cómo puede utilizar dbt y Postgres con Amazon MWAA. En los siguientes pasos, añadirá las dependencias necesarias a su requirements.txt y cargará un ejemplo de proyecto de dbt en el bucket de Amazon S3 de su entorno. A continuación, utilizará un ejemplo de DAG para comprobar que Amazon MWAA ha instalado las dependencias y, por último, utilizará el BashOperator para ejecutar el proyecto dbt.

Versión

  • Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10.

Requisitos previos

Para completar los siguientes pasos, necesitará lo siguiente:

  • Un entorno de Amazon MWAA que utilice Apache Airflow v2.2.2. Este ejemplo se escribió y probó con la versión 2.2.2. Es posible que tenga que modificar el ejemplo para usarlo con otras versiones de Apache Airflow.

  • Un ejemplo de proyecto de dbt. Para empezar a usar dbt con Amazon MWAA, puede crear una ramificación y clonar el proyecto de inicio de dbt desde el repositorio dbt-labs de GitHub.

Dependencias

Para usar Amazon MWAA con dbt, agregue el siguiente script de inicio a su entorno. Para obtener más información, consulte Using a startup script with Amazon MWAA.

#!/bin/bash if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]] then exit 0 fi echo "------------------------------" echo "Installing virtual Python env" echo "------------------------------" pip3 install --upgrade pip echo "Current Python version:" python3 --version echo "..." sudo pip3 install --user virtualenv sudo mkdir python3-virtualenv cd python3-virtualenv sudo python3 -m venv dbt-env sudo chmod -R 777 * echo "------------------------------" echo "Activating venv in" $DBT_ENV_PATH echo "------------------------------" source dbt-env/bin/activate pip3 list echo "------------------------------" echo "Installing libraries..." echo "------------------------------" # do not use sudo, as it will install outside the venv pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1 echo "------------------------------" echo "Venv libraries..." echo "------------------------------" pip3 list dbt --version echo "------------------------------" echo "Deactivating venv..." echo "------------------------------" deactivate

En las secciones siguientes, cargará el directorio de su proyecto dbt a Amazon S3 y ejecutará un DAG que valide si Amazon MWAA ha instalado las dependencias dbt requeridas correctamente.

Carga de un proyecto de dbt en Amazon S3

Para poder utilizar un proyecto dbt con su entorno Amazon MWAA, cargue todo el directorio del proyecto a la carpeta de dags de su entorno. Cuando el entorno se actualice, Amazon MWAA descargará el directorio dbt en la carpeta local de usr/local/airflow/dags/.

Pasos para cargar un proyecto de dbt en Amazon S3
  1. Vaya al directorio en el que clonó el proyecto de inicio de dbt.

  2. Ejecute el siguiente comando de la AWS CLI de Amazon S3 para copiar el contenido del proyecto en la carpeta de dags de su entorno de forma recursiva mediante el parámetro --recursive. El comando creará un subdirectorio llamado “dbt” que puede usar para todos sus proyectos de dbt. Si el subdirectorio ya existe, los archivos del proyecto se copiarán en el directorio existente, no se creará un nuevo directorio. El comando también creará un subdirectorio dentro del directorio de dbt para este proyecto inicial específico.

    $ aws s3 cp dbt-starter-project s3://mwaa-bucket/dags/dbt/dbt-starter-project --recursive

    Puede utilizar diferentes nombres para los subdirectorios de los proyectos a fin de organizar varios proyectos de dbt dentro del directorio principal de dbt.

Uso de un DAG para verificar la instalación de la dependencia de dbt

El siguiente DAG utiliza un BashOperator y un comando de bash para comprobar si Amazon MWAA ha instalado correctamente las dependencias dbt especificadas en el archivo requirements.txt.

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command=""/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"" )

Haga lo siguiente para ver los registros de tareas y comprobar que dbt y sus dependencias se hayan instalado.

  1. Vaya a la consola de Amazon MWAA y, a continuación, seleccione Abrir interfaz de usuario de Airflow en la lista de entornos disponibles.

  2. En la interfaz de usuario de Apache Airflow, busque el DAG de dbt-installation-test en la lista y, a continuación, elija la fecha que aparece debajo de la columna Last Run para abrir la última tarea satisfactoria.

  3. Con Vista de gráfico, elija la tarea bash_command para abrir los detalles de la instancia de la tarea.

  4. Seleccione Registro para abrir los registros de tareas y, a continuación, compruebe que muestran la versión de dbt especificada en requirements.txt correctamente.

Uso de un DAG para ejecutar un proyecto dbt

El siguiente DAG utiliza un BashOperator para copiar los proyectos dbt que ha cargado en Amazon S3 desde el directorio local usr/local/airflow/dags/ al directorio accesible para escritura /tmp y, a continuación, ejecuta el proyecto dbt. Los comandos de bash asumen un proyecto dbt inicial titulado “dbt-starter-project”. Modifique el nombre del directorio de acuerdo con el nombre del directorio de su proyecto.

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") # assumes all files are in a subfolder of DAGs called dbt with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\ cp -R /usr/local/airflow/dags/dbt /tmp;\ echo 'listing project files:';\ ls -R /tmp;\ cd /tmp/dbt/mwaa_dbt_test_project;\ /usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log;\ rm -rf /tmp/dbt/mwaa_dbt_test_project" )