在 Amazon 上使用 dbt MWAA - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon 上使用 dbt MWAA

本主题演示了如何在亚马逊上使用 dbt 和 Postgres。MWAA在以下步骤中,您将所需的依赖项添加到 requirements.txt 中,并将示例 dbt 项目上传到环境的 Amazon S3 存储桶。然后,您将使用示例DAG来验证 Amazon 是否MWAA已安装依赖项,最后使用BashOperator来运行 dbt 项目。

版本

先决条件

在完成以下步骤之前,您需要具备以下条件:

  • 使用 Apache Airflow v2.2 的亚马逊MWAA环境。此示例已编写,并使用 v2.2.2 进行了测试。您可能需要修改示例以与其他 Apache Airflow 版本一起使用。

  • dbt 项目示例。要开始在 Amazon 上使用 dbtMWAA,你可以创建一个分支并从 dbt-labs 存储库中克隆 dbt 入门项目。 GitHub

依赖项

要将 Amazon MWAA 与 dbt 配合使用,请在您的环境中添加以下启动脚本。要了解更多信息,请参阅在 Amazon 上使用启动脚本MWAA

#!/bin/bash if [[ "${MWAA_AIRFLOW_COMPONENT}" != "worker" ]] then exit 0 fi echo "------------------------------" echo "Installing virtual Python env" echo "------------------------------" pip3 install --upgrade pip echo "Current Python version:" python3 --version echo "..." sudo pip3 install --user virtualenv sudo mkdir python3-virtualenv cd python3-virtualenv sudo python3 -m venv dbt-env sudo chmod -R 777 * echo "------------------------------" echo "Activating venv in" $DBT_ENV_PATH echo "------------------------------" source dbt-env/bin/activate pip3 list echo "------------------------------" echo "Installing libraries..." echo "------------------------------" # do not use sudo, as it will install outside the venv pip3 install dbt-redshift==1.6.1 dbt-postgres==1.6.1 echo "------------------------------" echo "Venv libraries..." echo "------------------------------" pip3 list dbt --version echo "------------------------------" echo "Deactivating venv..." echo "------------------------------" deactivate

在以下各节中,您将将 dbt 项目目录上传到 Amazon S3DAG,然后运行验证亚马逊是否MWAA已成功安装所需的 dbt 依赖项。

将 dbt 项目上传到 Amazon S3

为了能够在您的 Amazon MWAA 环境中使用 dbt 项目,您可以将整个项目目录上传到您的环境dags文件夹。当环境更新时,Amazon 会将 dbt 目录MWAA下载到本地usr/local/airflow/dags/文件夹。

要将 dbt 项目上传到 Amazon S3,请执行以下操作
  1. 导航到您克隆 dbt 入门项目的目录。

  2. 运行以下 Amazon S3 AWS CLI 命令,使用--recursive参数以递归方式将项目内容复制到您的环境dags文件夹。该命令会创建一个名为 dbt 的子目录,您可以将其用于所有 dbt 项目。如果子目录已经存在,则项目文件将被复制到现有目录中,并且不会创建新目录。该命令还会为该特定入门项目在 dbt 目录中创建一个子目录。

    $ aws s3 cp dbt-starter-project s3://mwaa-bucket/dags/dbt/dbt-starter-project --recursive

    您可以为项目子目录使用不同的名称,以便在 dbt 父目录中组织多个 dbt 项目。

使用DAG来验证 dbt 依赖关系的安装

以下内容DAG使用BashOperator和 bash 命令来验证 Amazon 是否MWAA已成功安装中指定的 dbt 依赖项。requirements.txt

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="dbt-installation-test", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command=""/usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt --version"" )

执行以下操作以查看任务日志并验证是否已安装 dbt 及其依赖项。

  1. 导航至 Amazon MWAA 控制台,然后从可用环境列表中选择 Open Airflow UI

  2. 在 Apache Airflow 用户界面上,dbt-installation-testDAG从列表中找到,然后在该Last Run列下选择打开最后一个成功任务的日期。

  3. 使用图表视图,选择 bash_command 任务以打开任务实例的详细信息。

  4. 选择日志来打开任务日志,然后验证日志是否成功列出了我们在 requirements.txt 中指定的 dbt 版本。

使用DAG来运行 dbt 项目

以下内容DAG使用将您上传到 Amazon S3 的 dbt 项目从本地usr/local/airflow/dags/目录复制到可写入的/tmp目录,然后运行 dbt 项目。BashOperatorbash 命令假设一个名为 dbt-starter-project 的 入门 dbt 项目。根据您项目目录的名称修改目录名称。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") # assumes all files are in a subfolder of DAGs called dbt with DAG(dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="source /usr/local/airflow/python3-virtualenv/dbt-env/bin/activate;\ cp -R /usr/local/airflow/dags/dbt /tmp;\ echo 'listing project files:';\ ls -R /tmp;\ cd /tmp/dbt/mwaa_dbt_test_project;\ /usr/local/airflow/python3-virtualenv/dbt-env/bin/dbt run --project-dir /tmp/dbt/mwaa_dbt_test_project --profiles-dir ..;\ cat /tmp/dbt_logs/dbt.log;\ rm -rf /tmp/dbt/mwaa_dbt_test_project" )