添加或更新 DAGs - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

添加或更新 DAGs

有向无环图 (DAGs) 在 Python 文件中定义,该DAG文件将结构定义为代码。您可以使用 AWS CLI、或 Amazon S3 控制台上传DAGs到您的环境。本主题介绍使用 Amazon S3 存储桶中的dags文件夹DAGs在适用于 Apache Airflow 的亚马逊托管工作流程环境中添加或更新 Apache Airflow 的步骤。

先决条件

在完成本页上的步骤之前,您需要具备以下条件。

工作方式

有向无环图 (DAG) 是在单个 Python 文件中定义DAG的,该文件将结构定义为代码。它包含以下各项:

要在亚马逊MWAA环境中运行 Apache Airflow 平台,您需要将DAG定义复制到存储dags桶中的文件夹。例如,存储桶中的DAG文件夹可能如下所示:

例 DAG文件夹
dags/ └ dag_def.py

Amazon 每 30 秒MWAA自动将新建和更改的对象从您的 Amazon S3 存储桶同步到亚马逊MWAA计划程序和工作容器的/usr/local/airflow/dags文件夹,从而保留 Amazon S3 源的文件层次结构,无论文件类型如何。新DAGs出现在 Apache Airflow 用户界面中的时间由以下因素控制。scheduler.dag_dir_list_interval对现有内容的更改DAGs将在下一个DAG处理循环中获取。

注意

您无需在DAG文件夹中包含airflow.cfg配置文件。您可以从亚马逊MWAA控制台覆盖默认 Apache Airflow 配置。有关更多信息,请参阅 在亚马逊上使用 Apache Airflow 配置选项 MWAA

v2 中发生了什么变化

  • 新增:运算符、挂钩和执行程序。你中的导入语句以及你在亚马逊plugins.zip上指定的自定义插件MWAA已在 Apache Airflow v1 和 Apache Airflow v2 之间发生了变化。DAGs例如,,Apache Airflow v1 中的 from airflow.contrib.hooks.aws_hook import AwsHook 已更改为 Apache Airflow v2 中的 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook。要了解更多信息,请参阅 Apache Airflow API参考指南中的 Python 参考

DAGs使用 Amazon MWAA CLI 实用工具进行测试

  • 命令行界面 (CLI) 实用程序在本地复制适用于 Apache Airflow 的亚马逊托管工作流程。

  • 在本地CLI构建 Docker 容器镜像,类似于亚马逊MWAA生产镜像。这允许您在部署到亚马逊之前运行本地 Apache Airflow 环境来开发和测试DAGs自定义插件和依赖项。MWAA

  • 要运行CLI,请参阅 aws-mwaa-local-runneron GitHub。

将DAG代码上传到亚马逊 S3

您可以使用 Amazon S3 控制台或 AWS Command Line Interface (AWS CLI) 将DAG代码上传到您的 Amazon S3 存储桶。以下步骤假设您正在将代码 (.py) 上传到 Amazon S3 存储桶中名为 dags 的文件夹。

使用 AWS CLI

AWS Command Line Interface (AWS CLI) 是一个开源工具,可让您使用命令行 shell 中的命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:

要使用上传 AWS CLI
  1. 以下示例列出所有 Amazon S3 存储桶。

    aws s3 ls
  2. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  3. 以下命令将 dag_def.py 文件上传到 dags 文件夹。

    aws s3 cp dag_def.py s3://YOUR_S3_BUCKET_NAME/dags/

    如果 Amazon S3 存储桶中尚不存在名为 dags 的文件夹,则此命令会创建 dags 文件夹,并将名为 dag_def.py 的文件上传到新文件夹。

使用 Amazon S3 控制台

Amazon S3 控制台是一个基于 Web 的UI ,允许您创建和管理 Amazon S3 桶中的资源。以下步骤假设您有一个名为DAGs的文件夹dags

要使用 Amazon S3 控制台上传,请执行以下操作
  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 选择环境。

  3. S3 窗格的DAG代码中选择 S3 存储桶链接,在 Amazon S3 控制台上打开您的存储桶。

  4. 选择 dags 文件夹。

  5. 选择上传

  6. 选择 添加文件

  7. 选择 dag_def.py 的本地副本,选择上传

在亚马逊MWAA控制台上指定您的DAGs文件夹路径(第一次)

以下步骤假设您要在 Amazon S3 桶中指定名为 dags 文件夹的路径。

  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 选择要运行的环境DAGs。

  3. 选择编辑

  4. 在 Amazon S3 窗格的DAG代码中,选择DAG文件夹字段旁边的浏览 S3

  5. 选择 dags 文件夹。

  6. 选择选择

  7. 选择下一步更新环境

在 Apache Airflow UI 上查看更改

登录 Apache Airflow

你需要在 AWS Identity and Access Management (IAM) 中拥有 AWS 账户的Apache Airflow 用户界面访问策略:A mazonMWAAWeb ServerAccess权限才能查看你的 Apache Airflow 用户界面。

要访问 Apache Airflow UI,请执行以下操作
  1. 在 Amazon MWAA 控制台上打开 “环境” 页面

  2. 选择环境。

  3. 选择打开 Airflow UI

接下来做什么?

  • 使用 on 在本地测试你的DAGs自定义插件和 Python 依赖关系 GitHub。aws-mwaa-local-runner