翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Lambda 関数を使用して DAG を呼び出す
次のコード例では、AWS Lambda 関数を使用して Apache Airflow CLI トークンを取得し、Amazon MWAA 環境で有向非巡回グラフ (DAG) を呼び出します。
Version
-
このページのコード例は、Python 3.10
の Apache Airflow v2 と共に使用可能です。
前提条件
コードサンプルを使用するには、以下が必要です。
-
Amazon MWAA 環境には、パブリックネットワークアクセスモードを使用してください。
-
最新の Python ランタイムを使用する Lambda 関数を用意してください。
注記
Lambda 関数と Amazon MWAA 環境が同じ VPC にある場合は、このコードをプライベートネットワークで使用できます。この構成では、Lambda 関数の実行ロールに、Amazon Elastic Compute Cloud (Amazon EC2) CreateNetworkInterface API オペレーションを呼び出すアクセス許可が必要です。AWSLambdaVPCAccessExecutionRole
アクセス許可
このページのコード例を使用するには、Amazon MWAA 環境の実行ロールが airflow:CreateCliToken
アクションを実行するためのアクセス権が必要です。AmazonMWAAAirflowCliAccess
AWS マネージドポリシーを使用して、このアクセス権限を提供できます。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }
詳細については、「Apache Airflow CLIポリシー: AmazonMWAAAirflowCliAccess」を参照してください。
依存関係
-
このコード例を Apache Airflow v2 で使用する場合、追加の依存関係は必要ありません。このコードでは、お使いの環境にある Apache Airflow v2 のベースインストール
を使用します。
コード例
-
AWS Lambda コンソールを https://console.aws.amazon.com/lambda/
で開きます。 -
関数リストから Lambda 関数を選択します。
-
関数ページで次のコードをコピーし、以下をリソース名に置き換えます。
-
YOUR_ENVIRONMENT_NAME
– Amazon MWAA 環境の名前。 -
YOUR_DAG_NAME
— 呼び出したい DAG の名前。
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
-
-
[デプロイ] を選択します。
-
[テスト] を選択し、Lambda コンソールを使用して関数を呼び出します。
-
Lambda が DAG を正常に呼び出したことを確認するには、Amazon MWAA コンソールを使用して、お使いの環境の Apache Airflow UI に移動し、次の操作を行います。
-
[DAG] ページの DAG のリストから新しいターゲット DAG を見つけます。
-
[前回の実行] で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における
invoke_dag
の最新のタイムスタンプとほぼ一致する必要があります。 -
[最近のタスク] で、前回の実行が成功したことを確認します。
-