Amazon MWAA での Apache Airflow のパフォーマンス調整
このページでは、Amazon MWAA での Apache Airflow 構成オプションの使用 を使用して Amazon Managed Workflows for Apache Airflow のパフォーマンスを調整するために推奨するベストプラクティスについて説明します。
Apache Airflow 構成オプションの追加
以下の手順では、Airflow 構成オプションを環境に追加するステップを説明します。
-
Amazon MWAA コンソールで、環境ページを開きます。
-
環境を選択します。
-
[編集] を選択します。
-
[次へ] を選択します。
-
Airflow 構成オプション ペインで [カスタム構成を追加] を選択します。
-
ドロップダウンリストから構成を選択して値を入力するか、カスタム構成を入力して値を入力します。
-
追加する構成ごとに [カスタム構成を追加] を選択します。
-
[保存] を選択します。
詳細については、「Amazon MWAA での Apache Airflow 構成オプションの使用」を参照してください。
Apache Airflow スケジューラー
Apache Airflow スケジューラーは Apache Airflow のコアコンポーネントです。スケジューラーに問題があると、DAG の解析やタスクのスケジュール設定ができなくなる可能性があります。Apache Airflow スケジューラーの調整の詳細については、Apache Airflow ドキュメンテーションウェブサイトの「スケジューラーのパフォーマンスの微調整」を参照してください。
パラメータ
このセクションでは、Apache Airflow スケジューラーで使用できる構成オプションとそのユースケースについて説明します。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
説明 |
ユースケース |
v2
|
celery.sync_parallelism
|
1
|
Celery エグゼキュターがタスクの状態を同期するために使用するプロセスの数です。
|
このオプションを使用すると、Celery エグゼキュターが使用するプロセスを制限することでキューの競合を防ぐことができます。デフォルトでは、CloudWatch Logs にタスクログを配信する際にエラーが発生しないように値が 1 に設定されています。この値を 0 に設定すると最大数のプロセスを使用することになりますが、タスクログの配信時にエラーが発生する可能性があります。
|
v2
|
scheduler.idle_sleep_time
|
1
|
スケジューラーの「ループ」で DAG ファイルが連続して処理されるまでに待機する秒数。
|
このオプションを使用すると、DAG 解析結果の取得、タスクの検索とキューへの追加、エグゼキューターでのキュー内のタスクの実行が完了した後に、スケジューラーがスリープする時間を長くすることで、スケジューラーの CPU 使用率を解放できます。この値を増やすと、Apache Airflow v2 の場合は scheduler.parsing_processes で、Apache Airflow v1 の場合は scheduler.max_threads で、環境で実行されるスケジューラーのスレッド数が消費されます。これにより、スケジューラーが DAG を解析する容量が減り、DAG がウェブサーバーに表示されるまでにかかる時間が長くなる可能性があります。
|
v2
|
scheduler.max_dagruns_to_create_per_loop
|
10
|
スケジューラー「ループ」ごとに作成する DagRuns の DAG の最大数。
|
このオプションを使用すると、スケジューラーの「ループ」での DAGRun の最大数を減らすことで、タスクのスケジューリングに必要なリソースを解放できます。
|
v2
|
scheduler.parsing_processes
|
次の式を使用して設定します: デフォルトでは (2 * number of vCPUs) - 1 。
|
スケジューラーが DAG をスケジュールするために並列して実行できるスレッドの数。
|
このオプションを使用すると、スケジューラーが DAG を解析するために並列して実行するプロセスの数を減らすことで、リソースを解放できます。DAG の解析がタスクスケジューリングに影響している場合は、この数を低く抑えることをお勧めします。環境の vCPU 数よりも小さい値を指定する必要があります。詳細については、「制限」を参照してください。
|
制限
このセクションでは、スケジューラーのデフォルトパラメータを調整する際に考慮すべき制限について説明します。
- scheduler.parsing_processes、scheduler.max_threads
-
1 つの環境クラスの vCPU ごとに 2 つのスレッドを使用できます。環境クラスのスケジューラー用に少なくとも 1 つのスレッドを予約する必要があります。タスクのスケジュールが遅れていることに気付いた場合は、環境クラスを増やす必要があるかもしれません。たとえば、大規模な環境では、スケジューラ用に 4 vCPU の Fargate コンテナインスタンスがあります。つまり、他のプロセスに使用できるスレッドの 7
合計は最大数になります。つまり、2 つのスレッドに 4 つの vCPUs を乗算した値から、スケジューラー自体の 1 を引いた値です。scheduler.max_threads
および scheduler.parsing_processes
で指定する値は、環境クラスで使用できるスレッド数 (以下を参照) を超えてはなりません。
-
mw1.small — 他のプロセスの 1
スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
-
mw1.medium — 他のプロセスの 3
スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
-
mw1.large — 他のプロセスの 7
スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
DAG フォルダー
Apache Airflow スケジューラーは、環境内の DAG フォルダーを継続的にスキャンします。含まれている plugins.zip
ファイル、または「airflow」インポートステートメントを含む Python (.py
) ファイル。生成されたすべての Python DAG オブジェクトは DagBag に格納され、そのファイルがスケジューラーによって処理され、スケジュールする必要があるタスク (ある場合) が決定されます。DAG ファイルの解析は、そのファイルに実行可能な DAG オブジェクトが含まれているかどうかに関係なく行われます。
パラメータ
このセクションでは、DAGs フォルダーで使用できる構成オプションとそのユースケースについて説明します。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
説明 |
ユースケース |
v2
|
scheduler.dag_dir_list_interval
|
300 秒
|
DAGs フォルダーをスキャンして新しいファイルを探す必要のある秒数。
|
このオプションを使用すると、DAGs フォルダーを解析する秒数を増やすことでリソースを解放できます。DAGs フォルダーに大量のファイルがあることが原因で、total_parse_time metrics で解析時間が長くなる場合は、この値を増やすことをお勧めします。
|
v2
|
scheduler.min_file_process_interval
|
30 秒
|
スケジューラーが DAG を解析して DAG への更新が反映されるまでの秒数。
|
このオプションを使用して、DAG を解析する前にスケジューラーが待機する秒数を増やすことで、リソースを解放できます。たとえば、30 の値を指定すると、DAG ファイルは 30 秒ごとに解析されます。お使いの環境の CPU 使用率を下げるには、この数値を高くしておくことをお勧めします。
|
DAG ファイル
Apache Airflow スケジューラーループの一部として、個々の DAG ファイルが解析されて DAG Python オブジェクトが抽出されます。Apache Airflow v2 以降では、スケジューラーは同時に最大数の解析プロセスを解析します。同じファイルが再び解析される前に、scheduler.min_file_process_interval
で指定した秒数が経過する必要があります。
パラメータ
このセクションでは、Apache Airflow DAG ファイルで使用できる構成オプションとそのユースケースについて説明します。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
説明 |
ユースケース |
v2
|
core.dag_file_processor_timeout
|
50 秒
|
DagFileProcessor が DAG ファイルの処理をタイムアウトするまでの秒数。
|
このオプションを使用すると、DAGFileProcessor がタイムアウトするまでの時間を増やすことでリソースを解放できます。DAG 処理ログにタイムアウトが発生し、実行可能な DAG が読み込まれなくなる場合は、この値を増やすことをお勧めします。
|
v2
|
core.dagbag_import_timeout
|
30 秒
|
Python ファイルをインポートするまでの秒数がタイムアウトします。
|
このオプションを使用すると、Python ファイルをインポートして DAG オブジェクトを抽出する際にスケジューラーがタイムアウトになるまでにかかる時間を増やすことで、リソースを解放できます。このオプションはスケジューラーの「ループ」の一部として処理され、core.dag_file_processor_timeout で指定されている値よりも小さい値が含まれている必要があります。
|
v2
|
core.min_serialized_dag_update_interval
|
30
|
データベース内のシリアル化された DAG が更新されるまでの最小秒数。
|
このオプションを使用すると、データベース内のシリアル化された DAG が更新されるまでの秒数を増やすことで、リソースを解放できます。DAG の数が多い場合、または複雑な DAG がある場合は、この値を増やすことをおすすめします。この値を増やすと、DAG がシリアル化されるときのスケジューラーとデータベースへの負荷が軽減されます。
|
v2
|
core.min_serialized_dag_fetch_interval
|
10
|
シリアル化された DAG が DAGBag に既に読み込まれているときに、データベースから再フェッチされる秒数。
|
このオプションを使用すると、シリアル化された DAG を再フェッチする秒数を増やすことでリソースを解放できます。データベースの「書き込み」速度を下げるには、この値を core.min_serialized_dag_update_interval で指定した値より大きくする必要があります。この値を増やすと、DAG がシリアル化される際のウェブサーバーとデータベースの負荷が軽減されます。
|
タスク
Apache Airflow スケジューラーとワーカーはどちらもタスクのキューイングとデキューに関与します。スケジューラーは、解析済みのスケジュール設定が完了したタスクを [なし] ステータスから [スケジュール済み] ステータスに移行します。同じく Fargate のスケジューラーコンテナーで実行されているエグゼキューターは、それらのタスクをキューに入れ、そのステータスを [キューで待機中] に設定します。ワーカーにキャパシティがあると、キューからタスクを取り出してステータスを [実行中] に設定し、その後、タスクが成功したか失敗したかに基づいてステータスを [成功] または [失敗] に変更します。
パラメータ
このセクションでは、Apache Airflow タスクで使用できる構成オプションとそのユースケースについて説明します。
Amazon MWAA がオーバーライドするデフォルトの構成オプションは赤
でマークされています。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
説明 |
ユースケース |
v2
|
core.parallelism
|
(maxWorkers * maxCeleryWorkers) / schedulers * 1.5 に基づいて動的に設定されます。
|
「実行中」のステータスを持つことができるタスクインスタンスの最大数。
|
このオプションを使用すると、同時に実行できるタスクインスタンスの数を増やすことでリソースを解放できます。指定する値は、使用可能なワーカーの数にワーカーのタスク密度を「掛けた」ものであるべきです。この値を変更するのは、多数のタスクが「実行中」または「キューで待機中」の状態で停止している場合のみにすることをおすすめします。
|
v2
|
core.dag_concurrency
|
10,000
|
各 DAG で同時に実行できるタスクインスタンスの数。
|
このオプションを使用すると、同時に実行できるタスクインスタンスの数を増やすことでリソースを解放できます。たとえば、10 個の並列タスクを含む 100 個の DAG があり、すべての DAG を同時に実行したい場合、利用可能なワーカーの数に celery.worker_concurrency でのワーカータスクの密度を「掛け」、それを DAG の数 (100 など) で割って最大並列処理を計算できます。
|
v2
|
core.execute_tasks_new_python_interpreter
|
True
|
Apache Airflow が親プロセスをフォークするか、新しい Python プロセスを作成してタスクを実行するかを決定します。 |
True に設定すると、Apache Airflow はプラグインに加えた変更を、タスクを実行するために作成された新しい Python プロセスとして認識します。
|
v2
|
celery.worker_concurrency
|
該当なし
|
Amazon MWAA は、このオプションの Airflow ベースインストールをオーバーライドして、自動スケーリングコンポーネントの一部としてワーカーをスケーリングします。
|
このオプションに指定された値はすべて無視されます。 |
v2
|
celery.worker_autoscale
|
mw1.small - 5,0
mw1.medium - 10,0
mw1.large - 20,0
mw1.xlarge - 40,0
mw1.2xlarge - 80,0
|
ワーカー向けのタスク同時実行性。
|
このオプションを使用すると、ワーカーの maximum 、minimum のタスク同時実行を減らすことでリソースを解放できます。ワーカーは、そのための十分なリソースがあるかどうかに関係なく、構成された maximum までの同時タスク受け入れます。十分なリソースがない状態でタスクをスケジュールすると、タスクはすぐに失敗します。リソースを大量に消費するタスクの場合は、タスクあたりの容量を増やすために値をデフォルトより小さくしてこの値を変更することをお勧めします。
|