Invocación de DAG con una función de Lambda - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Invocación de DAG con una función de Lambda

El siguiente código de ejemplo utiliza una función AWS Lambda para obtener un token CLI de Apache Airflow e invocar un gráfico acíclico dirigido (DAG) en un entorno de Amazon MWAA.

Versión

  • Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10.

Requisitos previos

Para utilizar este ejemplo de código, debe:

nota

Si la función de Lambda y su entorno Amazon MWAA están en la misma VPC, puede usar este código en una red privada. Para esta configuración, el rol de ejecución de la función de Lambda necesita permiso para llamar a la operación de la API de CreateNetworkInterface de Amazon Elastic Compute Cloud (Amazon EC2). Puede proporcionar este permiso mediante la política administrada de AWSLambdaVPCAccessExecutionRole de AWS.

Permisos

Para usar el ejemplo de código de esta página, el rol de ejecución de su entorno Amazon MWAA necesita acceso para realizar la acción airflow:CreateCliToken. Puede proporcionar este permiso mediante la política administrada AmazonMWAAAirflowCliAccess de AWS.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

Para obtener más información, consulte CLIPolítica de Apache Airflow: A mazonMWAAAirflow CliAccess.

Dependencias

Ejemplo de código

  1. Abra la consola de AWS Lambda en https://console.aws.amazon.com/lambda/.

  2. Elija su función de Lambda en la lista de funciones.

  3. En la página de funciones, copie el código siguiente y sustituya lo siguiente por los nombres de sus recursos:

    • YOUR_ENVIRONMENT_NAME: el nombre del entorno de Amazon MWAA.

    • YOUR_DAG_NAME: el nombre del DAG que desea invocar.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. Elija Implementar.

  5. Elija Probar para invocar la función mediante la consola Lambda.

  6. Para comprobar que su Lambda ha invocado correctamente su DAG, utilice la consola Amazon MWAA para navegar hasta la interfaz de usuario de Apache Airflow de su entorno y, a continuación, haga lo siguiente:

    1. En la página DAG, busque su nuevo DAG de destino en la lista de DAG.

    2. En Última ejecución, compruebe la marca de tiempo de la última ejecución del DAG. Esta marca de tiempo debe acercarse lo máximo posible a la última marca de tiempo para invoke_dag en su otro entorno.

    3. En Tareas recientes, compruebe que la última ejecución se haya realizado correctamente.