Utilisation de l'Apache Airflow REST API - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de l'Apache Airflow REST API

Amazon Managed Workflows for Apache Airflow (AmazonMWAA) permet d'interagir avec vos environnements Apache Airflow directement à l'aide d'Apache Airflow REST API pour les environnements exécutant Apache Airflow v2.4.3 et versions ultérieures. Cela vous permet d'accéder à vos MWAA environnements Amazon et de les gérer par programmation, en fournissant un moyen standardisé d'invoquer des flux de travail d'orchestration de données, de gérer vos différents composants d'Apache Airflow et de surveiller leur étatDAGs, tels que la base de données de métadonnées, le déclencheur et le planificateur.

Afin de garantir l'évolutivité lors de l'utilisation d'Apache Airflow, REST API Amazon vous MWAA offre la possibilité de dimensionner horizontalement la capacité du serveur Web pour faire face à une demande accrue, qu'elle soit due à des REST API demandes, à l'utilisation de l'interface de ligne de commande (CLI) ou à un plus grand nombre d'utilisateurs simultanés de l'interface utilisateur (UI) Apache Airflow. Pour plus d'informations sur la manière dont Amazon MWAA fait évoluer les serveurs Web, consultezConfiguration du dimensionnement automatique du serveur MWAA Web Amazon.

Vous pouvez utiliser Apache Airflow REST API pour implémenter les cas d'utilisation suivants pour vos environnements :

  • Accès par programmation — Vous pouvez désormais démarrer les DAG exécutions d'Apache Airflow, gérer des ensembles de données et récupérer l'état de divers composants tels que la base de données de métadonnées, les déclencheurs et les planificateurs sans avoir recours à l'interface utilisateur d'Apache Airflow ou. CLI

  • Intégration à des applications externes et à des microservices : le REST API support vous permet de créer des solutions personnalisées qui intègrent vos MWAA environnements Amazon à d'autres systèmes. Par exemple, vous pouvez démarrer des flux de travail en réponse à des événements provenant de systèmes externes, tels que des tâches de base de données terminées ou l'inscription de nouveaux utilisateurs.

  • Surveillance centralisée : vous pouvez créer des tableaux de bord de surveillance qui regroupent votre statut DAGs sur plusieurs MWAA environnements Amazon, permettant ainsi une surveillance et une gestion centralisées.

Pour plus d'informations sur Apache Airflow RESTAPI, consultez The Apache Airflow Reference REST API.

En utilisantInvokeRestApi, vous pouvez accéder à Apache Airflow à l'RESTAPIaide d' AWS informations d'identification. Vous pouvez également y accéder en obtenant un jeton d'accès au serveur Web, puis en utilisant le jeton pour l'appeler.

Note
  • Si vous rencontrez une erreur avec le message « Update your environment to use InvokeRestApi » lors de l'utilisation de l'InvokeRestApiopération, cela indique que vous devez mettre à jour votre MWAA environnement Amazon. Cette erreur se produit lorsque votre MWAA environnement Amazon n'est pas compatible avec les dernières modifications liées à InvokeRestApi cette fonctionnalité. Pour résoudre ce problème, mettez à jour votre MWAA environnement Amazon afin d'intégrer les modifications nécessaires à la InvokeRestApi fonctionnalité.

  • Le délai d'expiration par défaut de l'InvokeRestApiopération est de 10 secondes. Si l'opération ne se termine pas dans ce délai de 10 secondes, elle sera automatiquement interrompue et une erreur sera générée. Assurez-vous que vos REST API appels sont conçus pour être terminés dans ce délai afin d'éviter toute erreur.

Les exemples suivants montrent comment API appeler Apache Airflow REST API et démarrer une nouvelle DAG exécution :

Octroi de l'accès à Apache Airflow REST API : airflow:InvokeRestApi

Pour accéder à Apache Airflow à REST API l'aide AWS d'informations d'identification, vous devez accorder l'airflow:InvokeRestApiautorisation dans votre IAM politique. Dans l'exemple de politique suivant, spécifiez le AdminOp,User, Viewer ou le Public rôle dans {airflow-role} pour personnaliser le niveau d'accès des utilisateurs. Pour plus d'informations, consultez la section Rôles par défaut dans le guide de référence d'Apache Airflow.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:{your-region}:YOUR_ACCOUNT_ID:role/{your-environment-name}/{airflow-role}" ] } ] }
Note

Lors de la configuration d'un serveur Web privé, l'InvokeRestApiaction ne peut pas être invoquée depuis l'extérieur d'un cloud privé virtuel (VPC). Vous pouvez utiliser la aws:SourceVpc clé pour appliquer un contrôle d'accès plus précis à cette opération. Pour plus d'informations, consultez aws : SourceVpc.

Appeler l'Apache Airflow REST API

L'exemple de script suivant explique comment utiliser Apache Airflow REST API pour répertorier les éléments disponibles DAGs dans votre environnement et comment créer une variable Apache Airflow :

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

Création d'un jeton de session de serveur Web et appel d'Apache Airflow REST API

Pour créer un jeton d'accès au serveur Web, utilisez la fonction Python suivante. Cette fonction appelle d'abord Amazon MWAA API pour obtenir un jeton de connexion Web. Le jeton de connexion Web, qui expire au bout de 60 secondes, est ensuite échangé contre un jeton de session Web, qui vous permet d'accéder au serveur Web et d'utiliser Apache Airflow RESTAPI. Si vous avez besoin de plus de 10 transactions par seconde (TPS) de capacité de régulation, vous pouvez utiliser cette méthode pour accéder à Apache Airflow. REST API

Note

Le jeton de session expire au bout de 12 heures.

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Une fois l'authentification terminée, vous disposez des informations d'identification nécessaires pour commencer à envoyer des demandes aux API points de terminaison. Dans l'exemple ci-dessous, utilisez le point de terminaisondags/{dag_id}/dagRuns.

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)