

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Lambda 関数を使用して DAG を呼び出す
<a name="samples-lambda"></a>

次のコード例では、[AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html) 関数を使用して Apache Airflow CLI トークンを取得し、Amazon MWAA 環境で有向非巡回グラフ (DAG) を呼び出します。

**Topics**
+ [バージョン](#samples-lambda-version)
+ [前提条件](#samples-lambda-prereqs)
+ [アクセス許可](#samples-lambda-permissions)
+ [依存関係](#samples-lambda-dependencies)
+ [コード例](#samples-lambda-code)

## バージョン
<a name="samples-lambda-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-lambda-prereqs"></a>

コードサンプルを使用するには、以下が必要です。
+ [Amazon MWAA 環境](get-started.md) には、[パブリックネットワークアクセスモード](configuring-networking.md#webserver-options-public-network-onconsole) を使用してください。
+ 最新の Python ランタイムを使用する [Lambda 関数](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html) を用意してください。

**注記**  
Lambda 関数と Amazon MWAA 環境が同じ VPC にある場合は、このコードをプライベートネットワークで使用できます。この設定では、Lambda 関数の実行ロールに、Amazon Elastic Compute Cloud (Amazon EC2) **CreateNetworkInterface** API オペレーションを呼び出すアクセス許可が必要です。このアクセス許可は、 [https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor](https://console.aws.amazon.com/iam/home?#/policies/arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole$jsonEditor) AWS管理ポリシーを使用して指定できます。

## アクセス許可
<a name="samples-lambda-permissions"></a>

このページのコード例を使用するには、Amazon MWAA 環境の実行ロールが `airflow:CreateCliToken` アクションを実行するためのアクセス権が必要です。このアクセス許可は、 `AmazonMWAAAirflowCliAccess` AWS管理ポリシーを使用して付与できます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

詳細については、[Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access) を参照してください。

## 依存関係
<a name="samples-lambda-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して、Apache Airflow をインストールします。

## コード例
<a name="samples-lambda-code"></a>

1. [https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/) で AWS Lambda コンソールを開きます。

1. **関数** リストから Lambda 関数を選択します。

1. 関数ページで次のコードをコピーし、以下をリソース名に置き換えます。
   + `YOUR_ENVIRONMENT_NAME` – Amazon MWAA 環境の名前。
   + `YOUR_DAG_NAME` — 呼び出したい DAG の名前。

   ```
   import boto3
   import http.client
   import base64
   import ast
   mwaa_env_name = 'YOUR_ENVIRONMENT_NAME'
   dag_name = 'YOUR_DAG_NAME'
   mwaa_cli_command = 'dags trigger'
   ​
   client = boto3.client('mwaa')
   ​
   def lambda_handler(event, context):
       # get web token
       mwaa_cli_token = client.create_cli_token(
           Name=mwaa_env_name
       )
       
       conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname'])
       payload = mwaa_cli_command + " " + dag_name
       headers = {
         'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'],
         'Content-Type': 'text/plain'
       }
       conn.request("POST", "/aws_mwaa/cli", payload, headers)
       res = conn.getresponse()
       data = res.read()
       dict_str = data.decode("UTF-8")
       mydata = ast.literal_eval(dict_str)
       return base64.b64decode(mydata['stdout'])
   ```

1. **デプロイ** を選択します。

1. **テスト** を選択し、Lambda コンソールを使用して関数を呼び出します。

1. Lambda が DAG を正常に呼び出したことを確認するには、Amazon MWAA コンソールを使用して、お使いの環境の Apache Airflow UI に移動し、次の操作を行います。

   1. **DAG** ページの DAG のリストから新しいターゲット DAG を見つけます。

   1. **前回の実行** で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における `invoke_dag` の最新のタイムスタンプとほぼ一致する必要があります。

   1. **[最近のタスク]** で、前回の実行が成功したことを確認します。