Lambda 関数を使用して DAG を呼び出す - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Lambda 関数を使用して DAG を呼び出す

次のコード例では、AWS Lambda 関数を使用して Apache Airflow CLI トークンを取得し、Amazon MWAA 環境で有向非巡回グラフ (DAG) を呼び出します。

Version

  • このページのコード例は、Python 3.10Apache Airflow v2 と共に使用可能です。

前提条件

コードサンプルを使用するには、以下が必要です。

注記

Lambda 関数と Amazon MWAA 環境が同じ VPC にある場合は、このコードをプライベートネットワークで使用できます。この構成では、Lambda 関数の実行ロールに、Amazon Elastic Compute Cloud (Amazon EC2) CreateNetworkInterface API オペレーションを呼び出すアクセス許可が必要です。AWSLambdaVPCAccessExecutionRole AWS マネージドポリシーを使用して、このアクセス権限を提供できます。

アクセス許可

このページのコード例を使用するには、Amazon MWAA 環境の実行ロールが airflow:CreateCliToken アクションを実行するためのアクセス権が必要です。AmazonMWAAAirflowCliAccess AWS マネージドポリシーを使用して、このアクセス権限を提供できます。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

詳細については、「Apache Airflow CLIポリシー: AmazonMWAAAirflowCliAccess」を参照してください。

依存関係

コード例

  1. AWS Lambda コンソールを https://console.aws.amazon.com/lambda/ で開きます。

  2. 関数リストから Lambda 関数を選択します。

  3. 関数ページで次のコードをコピーし、以下をリソース名に置き換えます。

    • YOUR_ENVIRONMENT_NAME – Amazon MWAA 環境の名前。

    • YOUR_DAG_NAME — 呼び出したい DAG の名前。

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. [デプロイ] を選択します。

  5. [テスト] を選択し、Lambda コンソールを使用して関数を呼び出します。

  6. Lambda が DAG を正常に呼び出したことを確認するには、Amazon MWAA コンソールを使用して、お使いの環境の Apache Airflow UI に移動し、次の操作を行います。

    1. [DAG] ページの DAG のリストから新しいターゲット DAG を見つけます。

    2. [前回の実行] で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における invoke_dag の最新のタイムスタンプとほぼ一致する必要があります。

    3. [最近のタスク] で、前回の実行が成功したことを確認します。