Uso da API REST do Apache Airflow
O Amazon Managed Workflows for Apache Airflow (Amazon MWAA) oferece suporte à interação com os ambientes do Apache Airflow diretamente usando a API REST do Apache Airflow para ambientes que executam o Apache Airflow v2.4.3 e superior. Isso permite acessar e gerenciar ambientes do Amazon MWAA de forma programática, fornecendo uma forma padronizada de invocar fluxos de trabalho de orquestração de dados, gerenciar DAGs e monitorar o status de vários componentes do Apache Airflow, como o banco de dados de metadados, o acionador e o programador.
Para oferecer suporte à escalabilidade ao usar a API REST do Apache Airflow, o Amazon MWAA oferece a opção de escalar horizontalmente a capacidade do servidor Web para lidar com o aumento da demanda, seja de solicitações da API REST, de uso da interface de linha de comandos (CLI) ou de mais usuários simultâneos da interface de usuário (UI) do Apache Airflow. Para obter mais informações sobre como o Amazon MWAA escala servidores Web, consulte Como configurar o ajuste de escala automático do servidor Web do Amazon MWAA.
Você pode usar a API REST do Apache Airflow para implementar os seguintes casos de uso nos ambientes:
-
Acesso programático: agora você pode iniciar as execuções de DAG do Apache Airflow, gerenciar conjuntos de dados e recuperar o status de vários componentes, como banco de dados de metadados, acionadores e programadores, sem depender da interface de usuário ou da CLI do Apache Airflow.
-
Integre-se com aplicações e microsserviços externos: o suporte à API REST permite criar soluções personalizadas que integram seus ambientes do Amazon MWAA com outros sistemas. Por exemplo, você pode iniciar fluxos de trabalho em resposta a eventos de sistemas externos, como trabalhos concluídos no banco de dados ou inscrições de novos usuários.
-
Monitoramento centralizado: você pode criar painéis de monitoramento que agregam o status de DAGs em vários ambientes do Amazon MWAA, permitindo monitoramento e gerenciamento centralizados.
Para obter mais informações sobre a API REST do Apache Airflow, consulte a referência da API REST do Apache Airflow
Você pode acessar a API REST do Apache Airflow usando credenciais da AWS. Como alternativa, você também pode acessá-la obtendo um token de acesso ao servidor Web e usando o token para chamá-la.
Os exemplos a seguir mostram como fazer chamadas de API para a API REST do Apache Airflow e iniciar uma nova execução de DAG:
Tópicos
Concessão de acesso à API REST do Apache Airflow: airflow:InvokeRestApi
Para acessar a API REST do Apache Airflow usando a credencial da AWS, você deve conceder a permissão airflow:InvokeRestApi
na política do IAM. No exemplo de política a seguir, especifique o Admin
, Op
, User
, Viewer
ou a função Public
em {airflow-role}
para personalizar o nível de acesso do usuário. Para obter mais informações, consulte Perfis padrão
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:{your-region}:YOUR_ACCOUNT_ID:role/{your-environment-name}/{airflow-role}" ] } ] }
nota
Ao configurar um servidor Web privado, a ação InvokeRestApi
não pode ser invocada de fora de uma nuvem privada virtual (VPC). Você pode usar a chave aws:SourceVpc
para aplicar um controle de acesso mais granular para essa operação. Para obter mais informações, consulte aws:SourceVpc.
Como chamar a API REST do Apache Airflow
O seguinte exemplo de script aborda como usar a API REST do Apache Airflow para listar os DAGs disponíveis no ambiente e como criar uma variável do Apache Airflow:
import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)
Como criar um token de sessão do servidor Web e chamar a API REST do Apache Airflow
Para criar um token de acesso ao servidor Web, use a função Python a seguir. Essa função primeiro chama a API do Amazon MWAA para obter um token de login da Web. O token de login da Web, que expira após 60 segundos, é então trocado por um token de sessão da Web, que permite acessar o servidor Web e usar a API REST do Apache Airflow. Caso precise de mais de 10 transações por segundo (TPS) de capacidade de controle de utilização, poderá usar esse método para acessar a API REST do Apache Airflow.
nota
O token da sessão expira após 12 horas.
def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
Depois que a autenticação for concluída, você terá as credenciais para começar a enviar solicitações aos endpoints da API. No exemplo abaixo, use o endpoint /dags/
. dag_id
/dag
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)