

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Lambda DAGs 函数进行调用
<a name="samples-lambda"></a>

以下代码示例使用 [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) 函数获取 Apache Airflow CLI 令牌并在 Amazon MWAA 环境中调用有向无环图（DAG）。

**Topics**
+ [版本](#samples-lambda-version)
+ [先决条件](#samples-lambda-prereqs)
+ [Permissions](#samples-lambda-permissions)
+ [依赖项](#samples-lambda-dependencies)
+ [代码示例](#samples-lambda-code)

## 版本
<a name="samples-lambda-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-lambda-prereqs"></a>

要使用此代码示例，您必须：
+ 使用 [Amazon MWAA](get-started.md) 环境[公共网络访问模式](configuring-networking.md#webserver-options-public-network-onconsole)。
+ 使用最新的 Python 运行时创建一个 [Lambda 函数](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html)。

**注意**  
如果 Lambda 函数和 Amazon MWAA 环境处于同一 VPC 中，则可以在私有网络上使用此代码。对于本配置，Lambda 函数的执行角色需要获得调用 Amazon Elastic Compute Cloud（Amazon EC2）**CreateNetworkInterface** API 操作的权限。您可以使用 [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS-managed 策略提供此权限。

## Permissions
<a name="samples-lambda-permissions"></a>

要使用本页上的代码示例，Amazon MWAA 环境的执行角色需要访问权限才能执行 `airflow:CreateCliToken` 操作。您可以使用 `AmazonMWAAAirflowCliAccess` AWS-managed 策略提供此权限：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

有关更多信息，请参阅[Apache Airflow CLI 政策：亚马逊 MWAAAirflow CliAccess](access-policies.md#cli-access)。

## 依赖项
<a name="samples-lambda-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。用于[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)安装 Apache Airflow。

## 代码示例
<a name="samples-lambda-code"></a>

1. 打开 AWS Lambda 控制台，网址为[https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/)。

1. 从 **Functions** 列表中选择 Lambda 函数。

1. 在函数页面上，复制以下代码并将以下代码替换为资源名称：
   + `YOUR_ENVIRONMENT_NAME` – Amazon MWAA 环境名称。
   + `YOUR_DAG_NAME` – 您想调用的 DAG 名称。

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. 选择**部署**。

1. 选择**测试**，使用 Lambda 控制台调用函数。

1. 要验证 Lambda 是否成功调用了 DAG，请使用 Amazon MWAA 控制台导航到环境的 Apache Airflow UI 界面，然后执行以下操作：

   1. 在该**DAGs**页面上，在列表中找到您的新目标 DAG DAGs。

   1. 在**上次运行**下，查看最新 DAG 运行的时间戳。此时间戳应与您其他环境中 `invoke_dag` 的最新时间戳非常匹配。

   1. 在**近期任务**下，检查上次运行是否成功。