Apache Airflow の使用 REST API - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Airflow の使用 REST API

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) は、Apache Airflow v2.4.3 以降を実行している環境RESTAPIに対して、Apache Airflow を使用して Apache Airflow 環境を直接操作することをサポートしています。これにより、Amazon MWAA環境にプログラムでアクセスおよび管理し、データオーケストレーションワークフローを呼び出し、 を管理しDAGs、メタデータデータベース、トリガー、スケジューラなどのさまざまな Apache Airflow コンポーネントのステータスをモニタリングするための標準化された方法を提供できます。

Apache Airflow の を直接使用できるようにAPI、Amazon MWAAでは、RESTAPIリクエスト、コマンドラインインターフェイス (CLI) REST の使用、または Apache Airflow ユーザーインターフェイス (UI) の同時ユーザーなど、需要の増加に対応するためにウェブサーバー容量を水平方向にスケーリングするオプションが用意されています。Amazon がウェブサーバーをMWAAスケーリングする方法の詳細については、「」を参照してくださいAmazon MWAA ウェブサーバーの自動スケーリングの設定

Apache Airflow を使用してRESTAPI、環境に次のユースケースを実装できます。

  • プログラムによるアクセス — Apache Airflow UI や に頼ることなく、Apache Airflow DAGの実行を開始したり、データセットを管理したり、メタデータデータベース、トリガー、スケジューラなどのさまざまなコンポーネントのステータスを取得したりできるようになりましたCLI。

  • 外部アプリケーションやマイクロサービスとの統合 – REST API サポートにより、Amazon MWAA環境を他のシステムと統合するカスタムソリューションを構築できます。例えば、完了したデータベースジョブや新しいユーザーのサインアップなど、外部システムからのイベントに応じてワークフローを開始できます。

  • 一元化されたモニタリング – 複数の Amazon MWAA環境DAGsにわたる のステータスを集約するモニタリングダッシュボードを構築して、一元的なモニタリングと管理を可能にします。

以下のトピックでは、ウェブサーバーアクセストークンを取得し、そのトークンを使用して Apache Airflow をAPI呼び出す方法を示しますRESTAPI。次の例では、 を呼び出しAPIて新しいDAG実行を開始します。

Apache Airflow の詳細についてはAPI、REST「Apache Airflow RESTAPIリファレンス」を参照してください。

ウェブサーバーセッショントークンを作成する

ウェブサーバーアクセストークンを作成するには、次の Python 関数を使用します。この関数は、まず Amazon を呼び出しMWAAAPIてウェブログイントークンを取得します。60 秒後に期限切れになるウェブログイントークンは、ウェブセッショントークンと交換されます。これにより、ウェブサーバーにアクセスして Apache Airflow を使用できますRESTAPI。

注記

セッショントークンは 12 時間後に期限切れになります。

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Apache Airflow を呼び出す REST API

認証が完了すると、APIエンドポイントへのリクエストの送信を開始するための認証情報が取得されます。以下の例では、エンドポイント を使用します/dags/dag_id/dag

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)