使用 Lambda 函数调用 DAG - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Lambda 函数调用 DAG

以下代码示例使用 AWS Lambda 函数获取 Apache Airflow CLI 令牌并在 Amazon MWAA 环境中调用有向无环图(DAG)。

版本

  • 您可以在 Python 3.10 中将本页上的代码示例与 Apache Airflow v2 一起使用。

先决条件

要使用此代码示例,您必须:

注意

如果 Lambda 函数和 Amazon MWAA 环境处于同一 VPC 中,则可以在私有网络上使用此代码。对于本配置,Lambda 函数的执行角色需要获得调用 Amazon Elastic Compute Cloud(Amazon EC2)CreateNetworkInterface API 操作的权限。您可以使用 AWSLambdaVPCAccessExecutionRole AWS 托管策略添加此权限。

权限

要使用本页上的代码示例,Amazon MWAA 环境的执行角色需要访问权限才能执行 airflow:CreateCliToken 操作。您可以使用 AmazonMWAAAirflowCliAccess AWS 托管策略添加此权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

有关更多信息,请参阅 Apache Airflow CLI 策略:A mazonMWAAAirflow CliAccess

依赖项

代码示例

  1. 通过 https://console.aws.amazon.com/lambda/ 打开 AWS Lambda 控制台。

  2. Functions 列表中选择 Lambda 函数。

  3. 在函数页面上,复制以下代码并将以下代码替换为资源名称:

    • YOUR_ENVIRONMENT_NAME – Amazon MWAA 环境名称。

    • YOUR_DAG_NAME – 您想调用的 DAG 名称。

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. 选择部署

  5. 选择测试,使用 Lambda 控制台调用函数。

  6. 要验证 Lambda 是否成功调用了 DAG,请使用 Amazon MWAA 控制台导航到环境的 Apache Airflow UI 界面,然后执行以下操作:

    1. DAG 页面上,在 DAG 列表中找到新的目标 DAG。

    2. 上次运行下,查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 invoke_dag 的最新时间戳非常匹配。

    3. 近期任务下,检查上次运行是否成功。