Usando o Apache Airflow REST API - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usando o Apache Airflow REST API

O Amazon Managed Workflows for Apache Airflow (AmazonMWAA) suporta a interação com seus ambientes Apache Airflow diretamente usando o Apache Airflow REST API para ambientes que executam o Apache Airflow v2.4.3 e superior. Isso permite que você acesse e gerencie seus MWAA ambientes Amazon de forma programática, fornecendo uma forma padronizada de invocar fluxos de trabalho de orquestração de dados, gerenciar seus e monitorar o status de vários componentes do Apache AirflowDAGs, como banco de dados de metadados, acionador e agendador.

Para oferecer suporte direto ao uso do Apache Airflow, a REST API Amazon MWAA oferece a opção de escalar horizontalmente a capacidade do servidor web para lidar com o aumento da demanda, seja de REST API solicitações, uso da interface de linha de comando (CLI) ou mais usuários simultâneos da interface de usuário (UI) do Apache Airflow. Para obter mais informações sobre como a Amazon MWAA escala servidores web, consulteConfigurando a escalabilidade automática do servidor web Amazon MWAA.

Você pode usar o Apache Airflow REST API para implementar os seguintes casos de uso em seus ambientes:

  • Acesso programático — Agora você pode iniciar as DAG execuções do Apache Airflow, gerenciar conjuntos de dados e recuperar o status de vários componentes, como banco de dados de metadados, acionadores e agendadores, sem depender da interface do usuário do Apache Airflow ou. CLI

  • Integre-se com aplicativos externos e microsserviços — o REST API suporte permite que você crie soluções personalizadas que integram seus MWAA ambientes Amazon com outros sistemas. Por exemplo, você pode iniciar fluxos de trabalho em resposta a eventos de sistemas externos, como trabalhos concluídos no banco de dados ou inscrições de novos usuários.

  • Monitoramento centralizado — Você pode criar painéis de monitoramento que agregam o status do seu em DAGs vários MWAA ambientes da Amazon, permitindo monitoramento e gerenciamento centralizados.

Os tópicos a seguir mostram como você obtém um token de acesso ao servidor web e, em seguida, usa esse token para fazer API chamadas para o Apache REST API Airflow. No exemplo a seguir, você chamará o API para iniciar uma nova DAG execução.

Para obter mais informações sobre o Apache Airflow RESTAPI, consulte The Apache Airflow Reference. REST API

Crie um token de sessão do servidor web

Para criar um token de acesso ao servidor web, use a função Python a seguir. Essa função primeiro chama a Amazon MWAA API para obter um token de login na web. O token de login na web, que expira após 60 segundos, é então trocado por um token de sessão na web, que permite acessar o servidor web e usar o Apache Airflow. REST API

nota

O token da sessão expira após 12 horas.

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Ligue para o Apache Airflow REST API

Depois que a autenticação for concluída, você terá as credenciais para começar a enviar solicitações aos API endpoints. No exemplo abaixo, use o endpoint/dags/dag_id/dag.

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)