本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Lambda 函数调用 DAG
以下代码示例使用 AWS Lambda 函数获取 Apache Airflow CLI 令牌并在 Amazon MWAA 环境中调用有向无环图(DAG)。
版本
-
您可以在 Python 3.10
中将本页上的代码示例与 Apache Airflow v2 一起使用。
先决条件
要使用此代码示例,您必须:
-
使用 Amazon MWAA 环境公共网络访问模式。
-
使用最新的 Python 运行时创建一个 Lambda 函数。
注意
如果 Lambda 函数和 Amazon MWAA 环境处于同一 VPC 中,则可以在私有网络上使用此代码。对于本配置,Lambda 函数的执行角色需要获得调用 Amazon Elastic Compute Cloud(Amazon EC2)CreateNetworkInterface API 操作的权限。您可以使用 AWSLambdaVPCAccessExecutionRole
权限
要使用本页上的代码示例,Amazon MWAA 环境的执行角色需要访问权限才能执行 airflow:CreateCliToken
操作。您可以使用 AmazonMWAAAirflowCliAccess
AWS 托管策略添加此权限:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }
有关更多信息,请参阅 Apache Airflow CLI 策略:A mazonMWAAAirflow CliAccess。
依赖项
-
要在 Apache Airflow v2 中使用此代码示例,无需附加依赖项。该代码在环境中使用 Apache Airflow v2 基础版安装
。
代码示例
-
通过 https://console.aws.amazon.com/lambda/
打开 AWS Lambda 控制台。 -
从 Functions 列表中选择 Lambda 函数。
-
在函数页面上,复制以下代码并将以下代码替换为资源名称:
-
YOUR_ENVIRONMENT_NAME
– Amazon MWAA 环境名称。 -
YOUR_DAG_NAME
– 您想调用的 DAG 名称。
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
-
-
选择部署。
-
选择测试,使用 Lambda 控制台调用函数。
-
要验证 Lambda 是否成功调用了 DAG,请使用 Amazon MWAA 控制台导航到环境的 Apache Airflow UI 界面,然后执行以下操作:
-
在 DAG 页面上,在 DAG 列表中找到新的目标 DAG。
-
在上次运行下,查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中
invoke_dag
的最新时间戳非常匹配。 -
在近期任务下,检查上次运行是否成功。
-