Performance tuning for Apache Airflow on Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Performance tuning for Apache Airflow on Amazon MWAA

This topic describes how to tune the performance of an Amazon Managed Workflows for Apache Airflow environment using Using Apache Airflow configuration options on Amazon MWAA.

Adding an Apache Airflow configuration option

Use the following procedure to add an Airflow configuration option to your environment.

  1. Open the Environments page on the Amazon MWAA console.

  2. Choose an environment.

  3. Choose Edit.

  4. Choose Next.

  5. Choose Add custom configuration in the Airflow configuration options pane.

  6. Choose a configuration from the dropdown list and enter a value, or type a custom configuration and enter a value.

  7. Choose Add custom configuration for each configuration you want to add.

  8. Choose Save.

To learn more, refer to Using Apache Airflow configuration options on Amazon MWAA.

Apache Airflow scheduler

The Apache Airflow scheduler is a core component of Apache Airflow. An issue with the scheduler can prevent DAGs from being parsed and tasks from being scheduled. For more information about Apache Airflow scheduler tuning, refer to Fine-tuning your scheduler performance in the Apache Airflow documentation website.

Parameters

This section describes the configuration options available for the Apache Airflow scheduler (Apache Airflow v2 and later) and their use cases.

Apache Airflow v3
Configuration Use case

celery.sync_parallelism

The number of processes the Celery Executor uses to sync task state.

Default: 1

You can use this option to prevent queue conflicts by limiting the processes the Celery Executor uses. By default, a value is set to 1 to prevent errors in delivering task logs to CloudWatch Logs. Setting the value to 0 means using the maximum number of processes, but might cause errors when delivering task logs.

scheduler.scheduler_idle_sleep_time

The number of seconds to wait between consecutive DAG file processing in the scheduler "loop."

Default: 1

You can use this option to free up CPU usage on the scheduler by increasing the time the scheduler sleeps after it's finished retrieving DAG parsing results, finding and queuing tasks, and executing queued tasks in the Executor. Increasing this value consumes the number of scheduler threads run on an environment in dag_processor.parsing_processes for Apache Airflow v2 and Apache Airflow v3. This might reduce the capacity of the schedulers to parse DAGs, and increase the time it takes for DAGs to populate in the webserver.

scheduler.max_dagruns_to_create_per_loop

The maximum number of DAGs to create DagRuns for per scheduler "loop."

Default: 10

You can use this option to free up resources for scheduling tasks by decreasing the maximum number of DagRuns for the scheduler "loop."

dag_processor.parsing_processes

The number of threads the scheduler can run in parallel to schedule DAGs.

Default: Use (2 * number of vCPUs) - 1

You can use this option to free up resources by decreasing the number of processes the scheduler runs in parallel to parse DAGs. We recommend keeping this number low if DAG parsing is impacting task scheduling. You must specify a value that's less than the vCPU count on your environment. To learn more, refer to Limits.

Apache Airflow v2
Configuration Use case

celery.sync_parallelism

The number of processes the Celery Executor uses to sync task state.

Default: 1

You can use this option to prevent queue conflicts by limiting the processes the Celery Executor uses. By default, a value is set to 1 to prevent errors in delivering task logs to CloudWatch Logs. Setting the value to 0 means using the maximum number of processes, but might cause errors when delivering task logs.

scheduler.idle_sleep_time

The number of seconds to wait between consecutive DAG file processing in the scheduler "loop."

Default: 1

You can use this option to free up CPU usage on the scheduler by increasing the time the scheduler sleeps after it's finished retrieving DAG parsing results, finding and queuing tasks, and executing queued tasks in the Executor. Increasing this value consumes the number of scheduler threads run on an environment in scheduler.parsing_processes for Apache Airflow v2 and Apache Airflow v3. This might reduce the capacity of the schedulers to parse DAGs, and increase the time it takes for DAGs to populate in the webserver.

scheduler.max_dagruns_to_create_per_loop

The maximum number of DAGs to create DagRuns for per scheduler "loop."

Default: 10

You can use this option to free up resources for scheduling tasks by decreasing the maximum number of DagRuns for the scheduler "loop."

scheduler.parsing_processes

The number of threads the scheduler can run in parallel to schedule DAGs.

Default: Use (2 * number of vCPUs) - 1

You can use this option to free up resources by decreasing the number of processes the scheduler runs in parallel to parse DAGs. We recommend keeping this number low if DAG parsing is impacting task scheduling. You must specify a value that's less than the vCPU count on your environment. To learn more, refer to Limits.

Limits

This section describes the limits to consider when adjusting the default parameters for the scheduler.

scheduler.parsing_processes, scheduler.max_threads (v2 only)

Two threads are allowed per vCPU for an environment class. At least one thread must be reserved for the scheduler for an environment class. If you notice a delay in tasks being scheduled, you might need to increase your environment class. For example, a large environment has a 4 vCPU Fargate container instance for its scheduler. This means that a maximum of 7 total threads are available to use for other processes. That is, two threads multiplied four vCPUs, minus one for the scheduler itself. The value you specify in scheduler.max_threads (v2 only) and scheduler.parsing_processes must not exceed the number of threads available for an environment class, as listed:

  • mw1.small – Must not exceed 1 thread for other processes. The remaining thread is reserved for the scheduler.

  • mw1.medium – Must not exceed 3 threads for other processes. The remaining thread is reserved for the scheduler.

  • mw1.large – Must not exceed 7 threads for other processes. The remaining thread is reserved for the scheduler.

DAG folders

The Apache Airflow scheduler continuously scans the DAGs folder on your environment. Any contained plugins.zip files, or Python (.py) files containing “airflow” import statements. Any resulting Python DAG objects are then placed into a DagBag for that file to be processed by the scheduler to determine what, if any, tasks need to be scheduled. Dag file parsing occurs regardless of whether the files contain any viable DAG objects.

Parameters

This section describes the configuration options available for the DAGs folder (Apache Airflow v2 and later) and their use cases.

Apache Airflow v3
Configuration Use case

dag_processor.refresh_interval

The number of seconds the DAGs folder must be scanned for new files.

Default: 300 seconds

You can use this option to free up resources by increasing the number of seconds to parse the DAGs folder. We recommend increasing this value if you experience long parsing times in total_parse_time metrics, which might be due to a large number of files in your DAGs folder.

dag_processor.min_file_process_interval

The number of seconds after which the scheduler parses a DAG and updates to the DAG are reflected.

Default: 30 seconds

You can use this option to free up resources by increasing the number of seconds that the scheduler waits before parsing a DAG. For example, if you specify a value of 30, the DAG file is parsed after every 30 seconds. We recommend keeping this number high to decrease the CPU usage on your environment.

Apache Airflow v2
Configuration Use case

scheduler.dag_dir_list_interval

The number of seconds the DAGs folder must be scanned for new files.

Default: 300 seconds

You can use this option to free up resources by increasing the number of seconds to parse the DAGs folder. We recommend increasing this value if you experience long parsing times in total_parse_time metrics, which might be due to a large number of files in your DAGs folder.

scheduler.min_file_process_interval

The number of seconds after which the scheduler parses a DAG and updates to the DAG are reflected.

Default: 30 seconds

You can use this option to free up resources by increasing the number of seconds that the scheduler waits before parsing a DAG. For example, if you specify a value of 30, the DAG file is parsed after every 30 seconds. We recommend keeping this number high to decrease the CPU usage on your environment.

DAG files

As part of the Apache Airflow scheduler loop, individual DAG files are parsed to extract DAG Python objects. In Apache Airflow v2 and later, the scheduler parses a maximum of number of parsing processes at the same time. The number of seconds specified in scheduler.min_file_process_interval (v2) or dag_processor.min_file_process_interval (v3) must pass before the same file is parsed again.

Parameters

This section describes the configuration options available for Apache Airflow DAG files (Apache Airflow v2 and later) and their use cases.

Apache Airflow v3
Configuration Use case

dag_processor.dag_file_processor_timeout

The number of seconds before the DagFileProcessor times out processing a DAG file.

Default: 50 seconds

You can use this option to free up resources by increasing the time it takes before the DagFileProcessor times out. We recommend increasing this value if you experience timeouts in your DAG processing logs that result in no viable DAGs being loaded.

core.dagbag_import_timeout

The number of seconds before importing a Python file times out.

Default: 30 seconds

You can use this option to free up resources by increasing the time it takes before the scheduler times out while importing a Python file to extract the DAG objects. This option is processed as part of the scheduler "loop," and must contain a value less than the value specified in dag_processor.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

The minimum number of seconds after which serialized DAGs in the database are updated.

Default: 30

You can use this option to free up resources by increasing the number of seconds after which serialized DAGs in the database are updated. We recommend increasing this value if you have a large number of DAGs, or complex DAGs. Increasing this value reduces the load on the scheduler and the database as DAGs are serialized.

core.min_serialized_dag_fetch_interval

The number of seconds a serialized DAG is re-fetched from the database when already loaded in the DagBag.

Default: 10

You can use this option to free up resources by increasing the number of seconds a serialized DAG is re-fetched. The value must be greater than the value specified in core.min_serialized_dag_update_interval to reduce database "write" rates. Increasing this value reduces the load on the webserver and the database as DAGs are serialized.

Apache Airflow v2
Configuration Use case

core.dag_file_processor_timeout

The number of seconds before the DagFileProcessor times out processing a DAG file.

Default: 50 seconds

You can use this option to free up resources by increasing the time it takes before the DagFileProcessor times out. We recommend increasing this value if you experience timeouts in your DAG processing logs that result in no viable DAGs being loaded.

core.dagbag_import_timeout

The number of seconds before importing a Python file times out.

Default: 30 seconds

You can use this option to free up resources by increasing the time it takes before the scheduler times out while importing a Python file to extract the DAG objects. This option is processed as part of the scheduler "loop," and must contain a value less than the value specified in core.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

The minimum number of seconds after which serialized DAGs in the database are updated.

Default: 30

You can use this option to free up resources by increasing the number of seconds after which serialized DAGs in the database are updated. We recommend increasing this value if you have a large number of DAGs, or complex DAGs. Increasing this value reduces the load on the scheduler and the database as DAGs are serialized.

core.min_serialized_dag_fetch_interval

The number of seconds a serialized DAG is re-fetched from the database when already loaded in the DagBag.

Default: 10

You can use this option to free up resources by increasing the number of seconds a serialized DAG is re-fetched. The value must be greater than the value specified in core.min_serialized_dag_update_interval to reduce database "write" rates. Increasing this value reduces the load on the webserver and the database as DAGs are serialized.

Tasks

The Apache Airflow scheduler and workers are both involved in queuing and de-queuing tasks. The scheduler takes parsed tasks ready to schedule from a None status to a Scheduled status. The executor, also running on the scheduler container in Fargate, queues those tasks and sets their status to Queued. When the workers have capacity, it takes the task from the queue and sets the status to Running, which subsequently changes its status to Success or Failed based on whether the task succeeds or fails.

Parameters

This section describes the configuration options available for Apache Airflow tasks and their use cases.

The default configuration options that Amazon MWAA overrides are marked in red.

Apache Airflow v3
Configuration Use case

core.parallelism

The maximum number of task instances that can have a Running status.

Default: Dynamically set based on (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

You can use this option to free up resources by increasing the number of task instances that can run simultaneously. The value specified must be the number of available workers multiplied by the workers' task density. We recommend changing this value only when you experience a large number of tasks stuck in the “Running” or “Queued” state.

core.execute_tasks_new_python_interpreter

Determines whether Apache Airflow executes tasks by forking the parent process, or by creating a new Python process.

Default: True

When set to True, Apache Airflow recognizes changes you make to your plugins as a new Python process so created to execute tasks.

celery.worker_concurrency

Amazon MWAA overrides the Airflow base install for this option to scale workers as part of its autoscaling component.

Default: Not applicable

Any value specified for this option is ignored.

celery.worker_autoscale

The task concurrency for workers.

Defaults:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

You can use this option to free up resources by reducing the maximum, minimum task concurrency of workers. Workers accept up to the maximum concurrent tasks configured, regardless of whether there are sufficient resources to do so. If tasks are scheduled without sufficient resources, the tasks immediately fail. We recommend changing this value for resource-intensive tasks by reducing the values to be less than the defaults to allow more capacity per task.

Apache Airflow v2
Configuration Use case

core.parallelism

The maximum number of task instances that can have a Running status.

Default: Dynamically set based on (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

You can use this option to free up resources by increasing the number of task instances that can run simultaneously. The value specified must be the number of available workers multiplied by the workers' task density. We recommend changing this value only when you experience a large number of tasks stuck in the “Running” or “Queued” state.

core.dag_concurrency

The number of task instances allowed to run concurrently for each DAG.

Default: 10000

You can use this option to free up resources by increasing the number of task instances allowed to run concurrently. For example, if you have one hundred DAGs with ten parallel tasks, and you want all DAGs to run concurrently, you can calculate the maximum parallelism as the number of available workers multiplied by the workers task density in celery.worker_concurrency, divided by the number of DAGs.

core.execute_tasks_new_python_interpreter

Determines whether Apache Airflow executes tasks by forking the parent process, or by creating a new Python process.

Default: True

When set to True, Apache Airflow recognizes changes you make to your plugins as a new Python process so created to execute tasks.

celery.worker_concurrency

Amazon MWAA overrides the Airflow base install for this option to scale workers as part of its autoscaling component.

Default: Not applicable

Any value specified for this option is ignored.

celery.worker_autoscale

The task concurrency for workers.

Defaults:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

You can use this option to free up resources by reducing the maximum, minimum task concurrency of workers. Workers accept up to the maximum concurrent tasks configured, regardless of whether there are sufficient resources to do so. If tasks are scheduled without sufficient resources, the tasks immediately fail. We recommend changing this value for resource-intensive tasks by reducing the values to be less than the defaults to allow more capacity per task.