本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Apache Airflow CLI 命令參考
本主題說明 Amazon Managed Workflows for Apache Airflow 上支援和不支援的 Apache Airflow CLI 命令。
提示
REST API 比 CLI 更現代化,旨在與外部系統進行程式設計整合。REST 是與 Apache Airflow 互動的偏好方式。
內容
先決條件
下一節說明使用此頁面上的命令和指令碼所需的初步步驟。
存取
-
AWS 帳戶 在 AWS Identity and Access Management (IAM) 中存取 Amazon MWAA 許可政策Apache Airflow UI 存取政策:AmazonMWAAWebServerAccess。
-
AWS 帳戶 在 AWS Identity and Access Management (IAM) 中存取 Amazon MWAA 許可政策 完整 API 和主控台存取政策:AmazonMWAAFullApiAccess。
AWS CLI
AWS Command Line Interface (AWS CLI) 是一種開放原始碼工具,可讓您使用命令列 Shell 中的命令與 AWS 服務互動。若要完成此頁面上的步驟,您需要下列項目:
有何變更?
-
v3:氣流架構。Apache Airflow v3 引進突破性的架構變更,以提供更高的安全性和可擴展性,並使維護更容易。若要進一步了解,請參閱升級至 Airflow 3
。 -
v2:Airflow CLI 命令結構。Apache Airflow v2 CLI 經過組織,因此相關命令會分組為子命令,這表示如果您想要升級至 Apache Airflow v2,則需要更新 Apache Airflow v1 指令碼。例如,
unpause在 Apache Airflow v1dags unpause中為 Apache Airflow v2。若要進一步了解,請參閱 2.0 中的 Airflow CLI 變更。
支援的 CLI 命令
下一節列出 Amazon MWAA 上可用的 Apache Airflow CLI 命令。
支援的命令
使用剖析 DAGs命令
如果您的環境執行 Apache Airflow v2.0.2,如果 DAGs 使用依賴透過 安裝的套件的外掛程式,則剖析 DAG 的 CLI 命令將會失敗requirements.txt:
Apache Airflow 2.0.2 版
-
dags backfill -
dags list -
dags list-runs -
dags next-execution
如果您的 DAGs 不使用依賴透過 安裝的套件的外掛程式,您可以使用這些 CLI 命令requirements.txt。
範本程式碼
下一節包含使用 Apache Airflow CLI 的不同方法範例。
設定、取得或刪除 Apache Airflow v2 變數
您可以使用下列範例程式碼來設定、取得或刪除格式為 的變數<script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>。
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
觸發 DAG 時新增組態
您可以在觸發 DAG 時使用下列範例程式碼搭配 Apache Airflow v2 來新增組態,例如 airflow trigger_dag 'dag_name' —conf '{"key":"value"}'。
import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)
在對堡壘主機的 SSH 通道上執行 CLI 命令
使用以下範例,使用 SSH 通道代理對 Linux 堡壘主機執行 Airflow CLI 命令。
使用 curl
-
ssh -D 8080 -f -C -q -NYOUR_USER@YOUR_BASTION_HOST -
curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --headerYOUR_HEADERS--data-rawYOUR_CLI_COMMAND