本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
添加或更新 DAG
有向无环图(DAG)在 Python 文件中定义,该文件将 DAG 的结构定义为代码。您可以使用 AWS CLI 或 Amazon S3 控制台将 DAG 上传到环境。本页介绍使用 Amazon S3 存储桶中的 dags
文件夹,在 Amazon MWAA 环境中添加或更新 Apache Airflow DAG 的步骤。
小节目录
先决条件
在完成本页上的步骤之前,您需要具备以下条件。
-
权限 — AWS 账户必须已获得管理员授权,访问适用于环境的 AmazonMWAAFullConsoleAccess 访问控制策略。此外,执行角色必须允许 Amazon MWAA 环境访问环境所使用的 AWS 资源。
-
访问权限-如果您需要访问公共存储库才能直接在 Web 服务器上安装依赖项,则必须将环境配置为具有公共网络 Web 服务器访问权限。有关更多信息,请参阅 Apache Airflow 访问模式。
-
Amazon S3 配置 — 用于存储 DAG 的 Amazon S3 存储桶、在
plugins.zip
中的自定义插件和在requirements.txt
中的 Python 依赖项必须配置为已阻止公共访问和已启用版本控制。
工作原理
有向无环图(DAG)在单个 Python 文件中定义,该文件将 DAG 的结构定义为代码。它包含以下各项:
要在 Amazon MWAA 环境中运行 Apache Airflow 平台,您需要将 DAG 定义复制到存储桶中的 dags
文件夹。例如,存储桶中的 DAG 文件夹可能如下所示:
例 DAG 文件夹
dags/ └ dag_def.py
Amazon MWAA 每 30 秒自动将新建和更改的对象从 Amazon S3 存储桶同步到 Amazon MWAA 计划程序和工作线程容器的 /usr/local/airflow/dags
文件夹,从而保留 Amazon S3 源的文件层次结构,无论文件类型如何。新 DAG 出现在 Apache Airflow UI 中所需的时间由 scheduler.dag_dir_list_interval
控制。对现有 DAG 的更改将在下一个 DAG 处理循环中获取。
注意
您不需要在 DAG 文件夹中包含 airflow.cfg
配置文件。您可以从Amazon MWAA 控制台覆盖默认 Apache Airflow 配置。有关更多信息,请参阅 在 Amazon MWAA 上使用 Apache Airflow 配置选项。
v2 中发生了什么变化
-
新增:运算符、挂钩和执行程序。DAG 中的导入语句以及在 Amazon MWAA 上您在
plugins.zip
中指定的自定义插件在 Apache Airflow v1 和 Apache Airflow v2 之间发生了变化。例如,,Apache Airflow v1 中的from airflow.contrib.hooks.aws_hook import AwsHook
已更改为 Apache Airflow v2 中的from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
。要了解更多信息,请参阅《Apache Airflow 参考指南》中的 Python API 参考。
使用 Amazon MWAA CLI 实用工具测试 DAG
-
命令行界面(CLI)实用工具可在本地复制 Amazon MWAA 环境。
-
CLI 在本地构建 Docker 容器镜像,类似于 Amazon MWAA 生产镜像。这允许您在部署到 Amazon MWAA 之前运行本地 Apache Airflow 环境来开发和测试 DAG、自定义插件和依赖项。
-
要运行 CLI,请参阅 GitHub 上的 aws-mwaa-local-runner
。
将 DAG 代码上传到 Amazon S3
您可以使用 Amazon S3 控制台或 AWS Command Line Interface(AWS CLI)将 DAG 代码上传到 Amazon S3 存储桶中。以下步骤假设您正在将代码(.py
)上传到 Amazon S3 存储桶中名为 dags
的文件夹。
使用 AWS CLI
AWS Command Line Interface(AWS CLI)是一种开源工具,让您能够在命令行 Shell 中使用命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:
要使用 AWS CLI 上传,请执行以下操作
-
以下示例列出所有 Amazon S3 存储桶。
aws s3 ls
-
使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。
aws s3 ls s3://
YOUR_S3_BUCKET_NAME
-
以下命令将
dag_def.py
文件上传到dags
文件夹。aws s3 cp dag_def.py s3://
YOUR_S3_BUCKET_NAME
/dags/如果 Amazon S3 存储桶中尚不存在名为
dags
的文件夹,则此命令会创建dags
文件夹,并将名为dag_def.py
的文件上传到新文件夹。
使用 Amazon S3 控制台
Amazon S3 控制台是一个基于 Web 的UI ,允许您创建和管理 Amazon S3 桶中的资源。以下步骤假设您有一个名为 dags
的 DAG 文件夹。
要使用 Amazon S3 控制台上传,请执行以下操作
-
在 Amazon MWAA 控制台上打开环境页面
。 -
选择环境。
-
在 S3 中的 DAG 代码窗格中选择 S3 存储桶链接,在 Amazon S3 控制台上打开存储桶。
-
选择
dags
文件夹。 -
请选择 Upload(上传)。
-
选择 添加文件。
-
选择
dag_def.py
的本地副本,选择上传。
在 Amazon MWAA 控制台上指定 DAG 文件夹的路径(第一次)
以下步骤假设您要在 Amazon S3 桶中指定名为 dags
文件夹的路径。
-
在 Amazon MWAA 控制台上打开环境页面
。 -
选择要在其中运行 DAG 的环境。
-
选择编辑。
-
在 Amazon S3 中的 DAG 代码窗格上,选择DAG 文件夹字段旁边的浏览 S3。
-
选择
dags
文件夹。 -
选择选择。
-
选择下一步、更新环境。
在 Apache Airflow UI 上查看更改
登录 Apache Airflow
您需要在 AWS Identity and Access Management(IAM)中拥有 AWS 账户的 Apache Airflow 用户界面访问策略:A mazonMWAAWeb ServerAccess 权限才能查看 Apache Airflow UI。
要访问 Apache Airflow UI,请执行以下操作
-
在 Amazon MWAA 控制台上打开环境页面
。 -
选择环境。
-
选择打开 Airflow UI。
接下来做什么?
-
使用 GitHub 上的 aws-mwaa-local-runner
在本地测试 DAG、自定义插件和 Python 依赖项。