トラブルシューティング: Apache Airflow v1 の DAGs、オペレータ、接続、およびその他の問題
このページのトピックには、Amazon Managed Workflows for Apache Airflow 環境で遭遇する可能性がある Apache Airflow v1.10.12 の Python 依存関係、カスタムプラグイン、DAG、オペレータ、接続、タスク、およびウェブサーバーの問題に対する解決策が含まれています。
requirements.txt の更新
以下のトピックでは、requirements.txt
を更新する際に受け取る可能性のあるエラーについて説明しています。
apache-airflow-providers-amazon
を追加すると、環境が失敗します。
apache-airflow-providers-
はApache Airflow v2 とのみ互換性があり、xyz
apache-airflow-backport-providers-
は Apache Airflow 1.10.12 と互換性があります。xyz
Broken DAG
次のトピックでは、DAGs の実行時に発生する可能性があるエラーについて説明します。
Amazon DynamoDB オペレーターを使用しているときに「DAG が壊れました」というエラーが表示されました
次のステップを推奨します。
-
GitHub の「aws-mwaa-local-runner
」を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。 -
以下のパッケージを
requirements.txt
に追加します。boto
-
requirements.txt
ファイルに Python の依存関係を指定する方法については、requirements.txt での Python 依存関係の管理 を参照してください。
「壊れた DAG:psycopg2 という名前のモジュールはありません」というエラーが表示されました。
次のステップを推奨します。
-
GitHub の aws-mwaa-local-runner
を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。 -
requirements.txt
Apache Airflowのバージョンに次の項目を追加します。例:apache-airflow[postgres]==1.10.12
-
requirements.txt
ファイルに Python の依存関係を指定する方法については、requirements.txt での Python 依存関係の管理 を参照してください。
Slack オペレータを使用しているときに「DAG が壊れました」というエラーが表示されました。
次のステップを推奨します。
-
GitHub の aws-mwaa-local-runner
を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。 -
以下のパッケージを
requirements.txt
に追加し、Apache Airflow のバージョンを指定してください。例:apache-airflow[slack]==1.10.12
-
requirements.txt
ファイルに Python の依存関係を指定する方法については、requirements.txt での Python 依存関係の管理 を参照してください。
Google/GCP/BigQuery のインストール中に、さまざまなエラーが発生しました。
Amazon MWAA は Amazon Linux を使用しているため、特定のバージョンの Cython と暗号化ライブラリが必要です。次のステップを推奨します。
-
GitHub の「aws-mwaa-local-runner
」を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。 -
以下のパッケージを
requirements.txt
に追加します。grpcio==1.27.2 cython==0.29.21 pandas-gbq==0.13.3 cryptography==3.3.2 apache-airflow-backport-providers-amazon[google]
-
リバース・ポート・プロバイダを使用しない場合は、次の機能を使用できます:
grpcio==1.27.2 cython==0.29.21 pandas-gbq==0.13.3 cryptography==3.3.2 apache-airflow[gcp]==1.10.12
-
requirements.txt
ファイルに Python の依存関係を指定する方法については、requirements.txt での Python 依存関係の管理 を参照してください。
「DAG が壊れました:Cython という名前のモジュールはありません」というエラーが表示されました。
Amazon MWAA は、特定のバージョンの Cython を必要とする Amazon Linux を使用しています。次のステップを推奨します。
-
GitHub の「aws-mwaa-local-runner
」を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。 -
以下のパッケージを
requirements.txt
に追加します。cython==0.29.21
-
Cython ライブラリには、必要なさまざまな pip 依存バージョンがあります。例えば、
awswrangler==2.4.0
を使用するためにはpyarrow<3.1.0,>=2.0.0
が必要ですので、pip3 がpyarrow==3.0.0
をインストールしようとして Broken DAG エラーが発生します。許容できる、最も古いバージョンを明示的に指定することをおすすめします。例えば、最小値pyarrow==2.0.0
をawswrangler==2.4.0
よりも前に指定すると、エラーは解消され、requirements.txt
が正しくインストールされます。最終的な要件は、以下のようになります。cython==0.29.21 pyarrow==2.0.0 awswrangler==2.4.0
-
requirements.txt
ファイルに Python の依存関係を指定する方法については、「requirements.txt での Python 依存関係の管理」を参照してください。
演算子
次のトピックでは、Operators を使用するときに受け取る可能性のあるエラーについて説明します。
BigQuery オペレータの使用中にエラーが発生しました。
Amazon MWAA は UI 拡張機能があるオペレータをサポートしていません。次のステップを推奨します。
-
GitHub の aws-mwaa-local-runner
を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。 -
回避策は、問題のオペレータをインポートした後に
<operator name>.operator_extra_links = None
を設定するために、DAG に行を追加して拡張機能を上書きすることです。例:from airflow.contrib.operators.bigquery_operator import BigQueryOperator BigQueryOperator.operator_extra_links = None
-
上記をプラグインに追加することで、このアプローチをすべての DAG に使用できます。例については、Apache Airflow Python VirtualEnv オペレータ用のカスタムプラグインを作成する を参照してください。
接続
以下のトピックでは、Apache Airflow の接続を使用する際や別の AWS データベースを使用する際に受け取る可能性のあるエラーについて説明しています。
Snowflake に接続できません。
次のステップを推奨します。
-
GitHub の aws-mwaa-local-runner
を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。 -
ご使用の環境に適した requirements.txt に次のエントリを追加します。
asn1crypto == 0.24.0 snowflake-connector-python == 1.7.2
-
以下のインポートを DAG に追加する:
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
Apache Airflow 接続オブジェクトに、次のキーバリューのペアが含まれていることを確認します。
-
接続 ID: snowflake_conn
-
コーンタイプ: スノーフレーク
-
ホスト: <my account> <my region if not us-west-2>.snowflakecomputing.com
-
スキーマ: <my schema>
-
ログイン: <my user name>
-
パスワード: ********
-
ポート: <port, if any>
-
エキストラ:
{ "account": "<my account>", "warehouse": "<my warehouse>", "database": "<my database>", "region": "<my region if not using us-west-2 otherwise omit this line>" }
例:
>>> import json >>> from airflow.models.connection import Connection >>> myconn = Connection( ... conn_id='snowflake_conn', ... conn_type='Snowflake', ... host='
YOUR_ACCOUNT
.YOUR_REGION
.snowflakecomputing.com', ... schema='YOUR_SCHEMA
' ... login='YOUR_USERNAME
', ... password='YOUR_PASSWORD
', ... port='YOUR_PORT
' ... extra=json.dumps(dict(account='YOUR_ACCOUNT
', warehouse='YOUR_WAREHOUSE
', database='YOUR_DB_OPTION
', region='YOUR_REGION
')), ... )
Secrets Manager に接続できません。
次のステップを推奨します。
-
Apache Airflow 接続と変数のシークレットキーを作成する方法を Apache Airflow 接続を秘密 AWS Secrets Manager を使用して構成する で学習できます。
-
test-variable
で ApacheAirflow 変数 (Apache Airflow 変数の AWS Secrets Manager におけるシークレットキーの使用) のシークレットキーを使用する方法を学習します。 -
AWS Secrets Manager の Apache Airflow 接続でのシークレットキーの使用 で ApacheAirflow 接続 (
myconn
) のシークレットキーを使用する方法を学習します。
「<DB-identifier-name>.cluster-id.rds.amazonaws.com」 の MySQL サーバに接続できません。
Amazon MWAA のセキュリティグループと RDS セキュリティグループには、トラフィックが相互に行き交うことを可能にする入口が必要です。次のステップを推奨します。
-
Amazon MWAA の VPC セキュリティグループからのトラフィックをすべて許可するように RDS セキュリティグループを変更します。
-
Amazon MWAA の VPC セキュリティグループを変更し、RDS セキュリティグループからのすべてのトラフィックを許可します。
-
タスクを再実行し、CloudWatch Logs の Apache Airflow ログをチェックして、 SQL クエリが成功したかどうかを確認します。
ウェブサーバ
次のトピックでは、Amazon MWAA 上の Apache Airflow ウェブサーバーで発生する可能性のあるエラーについて説明します。
BigQueryOperator を使用しており、そのせいで web サーバーがクラッシュした。
次のステップを推奨します。
-
BigQueryOperator
やQuboleOperator
などのoperator_extra_links
を含む Apache Airflow オペレータを使用すると、Apache Airflow ウェブサーバーがクラッシュすることがあります。これらのオペレータは Web サーバにコードをロードしようとしますが、これはセキュリティ上の理由から許可されていません。インポートステートメント後に次のコードを追加して、DAG 内のオペレータにパッチを適用することをお勧めします。BigQueryOperator.operator_extra_links = None
-
GitHub の aws-mwaa-local-runner
を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。
ウェブサーバーにアクセスすると、5xx エラーが表示されます。
次のステップを推奨します。
-
Apache Airflow 構成オプションを確認してください。Apache Airflow 構成オプションとして指定したキーと値のペア(例: AWS Secrets Manager )が正しく設定されていることを確認してください。詳細については、Secrets Manager に接続できません。 を参照してください。
-
requirements.txt
をチェックしてください。Apache Airflow バージョンと互換性のある、requirements.txt
にリストされている Airflow の「extras」パッケージおよびその他のライブラリを確認してください。 -
requirements.txt
ファイルに Python の依存関係を指定する方法については、requirements.txt での Python 依存関係の管理 を参照してください。
「スケジューラーは実行されていないようです」というエラー表示があります。
スケジューラが実行されていないように見える場合や、最後の「ハートビート」が数時間前に受信された場合は、DAGs が Apache Airflow に表示されず、新しいタスクがスケジューリングされない可能性があります。
次のステップを推奨します。
-
VPC セキュリティグループがポート
5432
へのインバウンドアクセスを許可していることを確認します。これは、ご使用の環境の Amazon Aurora PostgreSQL メタデータデータベースに接続するために必要のポートです。このルールを追加したら、 Amazon MWAA を数分間待つとエラーは消えるはずです。詳細については、Amazon MWAA の VPC のセキュリティ を参照してください。注記
-
Aurora PostgreSQL メタデータベースは Amazon MWAA サービスアーキテクチャ の一部であり、AWS アカウント では表示されません。
-
データベース関連のエラーは通常、スケジューラー障害の症状であり、根本的な原因ではありません。
-
-
スケジューラーが動作していない場合は、依存関係のインストールの失敗 や スケジューラーの過負荷 など、さまざまな要因が原因である可能性があります。CloudWatch Logs で対応するロググループを表示して、DAGs、プラグイン、および要件が正しく機能していることを確認します。詳細については、Amazon Managed Workflows for Apache Airflow のモニタリングとメトリクス を参照してください。
タスク
次のトピックでは、Apache Airflow ログを表示した際に発生する可能性があるエラーについて説明します。
タスクが行き詰まっいるか、完了していません。
Apache Airflow タスクが「行き詰まっている」か、完了していない場合は、次のステップを実行することをお勧めします。
-
多数の DAG が定義されている可能性があります。DAG の数を減らし、環境の更新 (ログレベルの変更など) を実行して強制的にリセットしてください。
-
Airflow は DAG が有効かどうかに関係なく解析します。環境の 50% を超える容量を使用していると、Apache Airflow スケジューラに負荷がかかり始める可能性があります。これにより、CloudWatch メトリクスの合計解析時間が長くなったり、CloudWatCloudWatch Logs の DAG 処理時間が長くることがあります。Apache Airflow の設定を最適化する方法は他にもありますが、このガイドの対象範囲には含まれていません。
-
ご使用の環境のパフォーマンスを調整するために推奨するベスト・プラクティスの詳細については、「Amazon MWAA での Apache Airflow のパフォーマンス調整」 を参照してください。
-
-
キューには、多数のタスクがある可能性があります。これは通常、「なし」状態のタスクが多数発生しているか、CloudWatch でキューに入れられたタスクや保留中のタスクが多数発生している場合に発生します。これは以下のような原因で起こりうる:
-
実行するタスクの数が環境の実行能力を超えている場合や、自動スケーリング前にキューに入れられたタスクの数が多い場合は、タスクを検出して追加のワーカー.をデプロイする時間があります。
-
実行するタスクの数が、実行可能な環境よりも多い場合は、DAG が同時に実行するタスクの数を減らすか、Apache Airflow ワーカーの最小数を増やすことをお勧めします。
-
自動スケーリングで追加のワーカーを検出してデプロイする時間ができる前に、大量のタスクがキューに入れられている場合は、タスクの配備をずらすか、最小 Apache Airflow ワーカスレッドを増やすことをお勧めします。
-
AWS Command Line Interface (AWS CLI) の update-environment コマンドを使用して、環境で実行されるワーカーの最小数または最大数を変更できます。
aws mwaa update-environment --name MyEnvironmentName --min-workers 2 --max-workers 10
-
ご使用の環境のパフォーマンスを調整するために推奨するベスト・プラクティスの詳細については、Amazon MWAA での Apache Airflow のパフォーマンス調整 を参照してください。
-
-
タスクログとして表示され、それ以上の指示なしに Apache Airflow で停止したタスクが、実行中に削除された可能性があります。これは以下のような原因で起こりうる:
-
もし、1)現在のタスクが現在の環境のキャパシティを超え、2)数分間、タスクが実行されなかったり、キューに入れられなかったりした後、3)新しいタスクがキューに入れられたりする瞬間があります。
-
Amazon MWAA 自動スケーリングは、最初のシナリオに応じてワーカーを追加します。2 番目のシナリオでは、追加のワーカーが削除されます。キューに入っているタスクの中には、ワーカーが削除される過程にあり、コンテナが削除された時点で終了するものもあります。
-
環境内の最小ワーカー数を増やすことをお勧めします。もう 1 つの選択肢は、DAG とタスクの時間を調整して、これらの状況が発生しないようにすることです
-
また、最小ワーカー数を環境内の最大ワーカー数と同じようにに設定して、オートスケーリングを効果的に無効にすることもできます。AWS Command Line Interface (AWS CLI) の update-environment コマンドを使用して、ワーカーの最小数と最大数を同じに設定して自動スケーリングを無効にします。
aws mwaa update-environment --name MyEnvironmentName --min-workers 5 --max-workers 5
-
ご使用の環境のパフォーマンスを調整するために推奨するベスト・プラクティスの詳細については、Amazon MWAA での Apache Airflow のパフォーマンス調整 を参照してください。
-
-
タスクが「実行中」状態のままになっている場合は、タスクをクリアしたり、成功または失敗のマークを付けることもできます。これにより、環境の自動スケーリングコンポーネントは、環境で実行されているワーカー数をスケールダウンできます。次の図は、取り残されたタスクの例です。
-
取り残されたタスクの円を選択し、クリアを選択します(図を参照)。これにより Amazon MWAA はワーカーをスケールダウンできます。そうしない場合、Amazon MWAA はどの DAG が有効か無効かを判断できず、キューにタスクが残っている場合はスケールダウンできません。
-
-
Apache Airflow タスクライフサイクルの詳細については、Apache Airflow リファレンスガイドの 「概念」
を参照してください。
CLI
次のトピックでは、AWS Command Line Interface で Airflow CLI コマンドを実行したときに発生する可能性があるエラーについて説明します。
CLI で DAG をトリガーすると「503」エラーが表示されます
Airflow CLI は Apache Airflow ウェブサーバー上でで実行され、同時実行性が制限されています。通常、 CLI コマンドを最大 4 つ同時に実行できます。