Amazon MWAA 上的 Apache Airflow 的性能调整 - Amazon Managed Workflows for Apache Airflow

Amazon MWAA 上的 Apache Airflow 的性能调整

本页介绍了在使用 在 Amazon MWAA 上使用 Apache Airflow 配置选项 调整 Amazon MWAA 环境的性能时,我们推荐的最佳实践。

添加 Apache Airflow 配置选项。

以下过程将指导您完成将 Airflow 配置选项添加到环境中的步骤。

  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. 选择编辑

  4. 选择下一步

  5. Airflow 配置选项窗格中选择添加自定义配置。

  6. 从下拉列表中选择配置并输入值,或者键入自定义配置并输入值。

  7. 为每个您想要添加的配置选择添加自定义配置选项

  8. 选择保存

要了解更多信息,请参阅 在 Amazon MWAA 上使用 Apache Airflow 配置选项

Apache Airflow 计划程序

Apache Airflow 计划程序是 Apache Airflow 的核心组件。计划程序出现问题可能会导致无法解析 DAG 且无法调度任务。有关 Apache Airflow 计划程序调整的更多信息,请参阅 Apache Airflow 文档网站中的微调计划程序性能

参数

本节介绍可用于 Apache Airflow 计划程序的配置选项及其用例。

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

celery.sync_parallelism

1

Celery 执行程序用于同步任务状态的进程数。

您可以使用此选项通过限制 Celery 执行程序使用的进程来防止队列冲突。默认情况下,将值设为 1,以防止在向 CloudWatch Logs 传送任务日志时出现错误。将该值设为 0 意味着使用最大进程数,但在传送任务日志时可能会导致错误。

v2

scheduler.idle_sleep_time

1

计划程序“循环”中连续处理 DAG 文件之间等待的秒数。

在计划程序检索 DAG 解析结果、查找和排队任务以及在执行程序中执行排队任务后您可以使用此选项通过延长计划程序的休眠时间来释放计划程序上的 CPU 使用率。增加此值会消耗在 Apache Airflow v2 的 scheduler.parsing_processes 和 Apache Airflow v1 的 scheduler.max_threads 的环境中运行的计划程序线程数。这可能会降低计划程序解析 DAG 的容量,并增加 DAG 出现在 Web 服务器上的时间。

v2

scheduler.max_dagruns_to_create_per_loop

10

为每个计划程序“循环”创建 DagRuns 的最大数量。

您可以使用此选项通过减少计划程序“循环”的DagRuns的最大 次数来释放用于调度任务的资源。

v2

scheduler.parsing_processes

默认情况下使用以下公式进行设置:(2 * number of vCPUs) - 1

计划程序可以并行运行以调度 DAG 的线程数。

您可以使用此选项通过减少计划程序并行运行解析 DAG 的进程数来释放资源。如果 DAG 解析影响任务调度,我们建议将此数量保持在较低水平。您必须指定一个小于环境中 vCPU 计数的值。要了解更多信息,请参阅限制

限制

本节介绍调整计划程序的默认参数时应考虑的限制。

scheduler.parsing_processes、scheduler.max_threads

对于环境类,每个 vCPU 允许使用两个线程。必须为环境类的计划程序保留至少一个线程。如果您发现任务计划出现延迟,则可能需要增加环境。例如,大型环境为其计划程序设有一个 4 vCPU 的 Fargate 容器实例。这意味着可供其他进程使用的 7 个线程总数的上限。也就是说,两个线程乘以四个 vCPU,计划程序本身减一。您在 scheduler.max_threadsscheduler.parsing_processes 中指定的值不得超过环境类的可用线程数(如下所示:

  • mw1.small — 其他进程不得超过 1 个线程。剩下的线程是为计划程序保留的。

  • mw1.medium — 其他进程不得超过 3 个线程。剩下的线程是为计划程序保留的。

  • mw1.large — 其他进程不得超过 7 个线程。剩下的线程是为计划程序保留的。

DAG 文件夹

Apache Airflow 计划程序 会持续扫描环境中的 DAG 文件夹。任何包含的 plugins.zip 文件,或包含“Airflow”导入语句的 Python (.py) 文件。然后,所有生成的 Python DAG 对象都将放入 DagBag 中,以便计划程序处理该文件,以确定需要调度哪些任务(如有)。无论文件是否包含任何可行的 DAG 对象,都会进行 DAG 文件解析。

参数

本节介绍可用于 DAG 文件夹的配置选项及其用例。

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

scheduler.dag_dir_list_interval

300 秒

应扫描 DAG 文件夹以查找新文件的秒数。

您可以使用此选项通过增加解析 DAG 文件夹的秒数来释放资源。如果您发现 total_parse_time metrics 中的解析时间较长(这可能是由于 DAG 文件夹中有大量文件所致),我们建议您加大此值。

v2

scheduler.min_file_process_interval

30 秒

反映计划程序解析 DAG 并更新 DAG 之后的秒数。

您可以使用此选项通过增加计划程序在解析 DAG 之前等待的秒数来释放资源。例如,如果指定 30 的值,则将每 30 秒解析一次 DAG 文件。我们建议将此秒数保持在较高水平,以减小环境上的 CPU 使用率。

DAG 文件

作为 Apache Airflow 计划程序循环的一部分,将解析单个 DAG 文件以提取 DAG Python 对象。在 Apache Airflow v2 及更高版本中,计划程序可以同时解析的解析进程的最大数量。在再次解析同一个文件之前,scheduler.min_file_process_interval 中指定的秒数必须传递。

参数

本节介绍可用于 Apache Airflow DAG 文件的配置选项及其用例。

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

core.dag_file_processor_timeout

50 秒

DagFileProcessor 处理 DAG 文件超时前的秒数。

您可以使用此选项通过延长 DagFileProcessor 超时之前所需的时间来释放资源。如果您在 DAG 处理日志中看到超时导致无法加载可行的 DAG,我们建议您延长此值。

v2

core.dagbag_import_timeout

30 秒

导入 Python 文件之前的秒数超时。

在导入 Python 文件来提取 DAG 对象的同时延长 计划程序超时之前所需的时间,您可以使用此选项来释放资源。此选项作为计划程序“循环”的一部分进行处理,并且必须包含一个小于core.dag_file_processor_timeout 中指定值的值。

v2

core.min_serialized_dag_update_interval

30

更新数据库中序列化的 DAG 之后的最小秒数。

通过增加更新数据库中序列化的 DAG 之后的秒数,您可以使用此选项来释放资源。如果您有大量 DAG 或 DAG 复杂,我们建议您增加此值。当 DAG 被序列化时,增加此值可减少计划程序和数据库的负载。

v2

core.min_serialized_dag_fetch_interval

10

序列化的 DAG 已加载到 dagBag 中后从数据库中重新提取的秒数。

通过增加序列化的 DAG 重新提取的秒数,您可以使用此选项来释放资源。该值必须大于 core.min_serialized_dag_update_interval 中指定的值才能降低数据库的“写入”速率。当 DAG 被序列化时,增加此值可减少Web 服务器和数据库的负载。

任务

Apache Airflow 计划程序和工作线程都参与排队和出队任务。计划程序将已解析的准备调度的任务从状态变为已计划状态。也在 Fargate 的计划程序容器上运行的执行程序,对这些任务进行排队并将其状态设置为已排队。当工作线程有容量时,它会从队列中提取任务并将状态设置为正在运行,然后根据任务成功还是失败将其状态更改为成功失败

参数

本节介绍可用于 Apache Airflow 任务的配置选项及其用例。

Amazon MWAA 覆盖的默认配置选项标记为红色

Apache Airflow v2
版本 配置选项 默认值 描述 应用场景

v2

core.parallelism

基于 (maxWorkers * maxCeleryWorkers) / schedulers * 1.5 动态设置。

状态为“正在运行”的最大任务实例数。

通过增加可以同时运行的任务实例数,您可以使用此选项来释放资源。指定的值应为可用工作线程数“乘以”工作线程任务密度。我们建议您仅在看到大量任务处于“正在运行”或“已排队”状态时才更改此值。

v2

core.dag_concurrency

10000

允许为每个 DAG 同时运行的任务实例数。

通过增加可以并发运行的任务实例数,您可以使用此选项来释放资源。例如,如果您有一百个 DAG 和十个并行任务,并且您希望所有 DAG 并发运行,则可以将最大并行度计算为可用工作线程数“乘以”工作线程任务密度celery.worker_concurrency,除以 DAG 的数量(例如 100)。

v2

core.execute_tasks_new_python_interpreter

True

确定 Apache Airflow 是通过分叉父进程还是通过创建新的 Python 进程来执行任务。

设置为 True 时,Apache Airflow 会将您对插件所做的更改识别为为执行任务而创建的新 Python 进程。

v2

celery.worker_concurrency

不适用

Amazon MWAA 会覆盖此选项的 Airflow 基础版安装,以扩展工作线程作为其自动扩缩组件的一部分。

为此选项指定的任何值都将被忽略。

v2

celery.worker_autoscale

mw1.small - 5,0

mw1.medium - 10,0

mw1.large - 20,0

mw1.xlarge – 40,0

mw1.2xlarge – 80,0

工作线程的任务并发度。

通过降低工作线程maximumminimum任务并发度,您可以使用此选项来释放资源。无论是否有足够的资源这样做,工作线程最多可以接受配置的 maximum 并发任务。如果在没有足够资源的情况下调度任务,则任务会立即失败。我们建议为资源密集型任务更改此值,方法是将该值减少到小于默认值,以允许每个任务有更多容量。