本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
以下示例代码会使用 Amazon MWAA 上的 CLI 导入变量。
版本
-
您可以在 Python 3.10
中将本页上的代码示例与 Apache Airflow v2 一起使用。
先决条件
-
无需其他权限即可使用本页上的代码示例。
权限
您的 AWS 账户需要访问该AmazonMWAAAirflowCliAccess
政策。要了解更多信息,请参阅 Apache Airflow CLI 政策:亚马逊 MWAAAirflow CliAccess。
依赖项
-
要在 Apache Airflow v2 中使用此代码示例,无需附加依赖项。该代码在环境中使用 Apache Airflow v2 基础版安装
。
代码示例
以下示例代码需要三个输入:您的 Amazon MWAA 环境名称(中mwaa_env
)、您的环境 AWS 区域(中aws_region
)和包含要导入的变量的本地文件(中var_file
)。
import boto3
import json
import requests
import base64
import getopt
import sys
argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''
try:
opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
#if len(opts) == 0 and len(opts) > 3:
if len(opts) != 3:
print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
else:
for opt, arg in opts:
if opt in ("-e"):
mwaa_env=arg
elif opt in ("-r"):
aws_region=arg
elif opt in ("-v"):
var_file=arg
boto3.setup_default_session(region_name="{}".format(aws_region))
mwaa_env_name = "{}".format(mwaa_env)
client = boto3.client('mwaa')
mwaa_cli_token = client.create_cli_token(
Name=mwaa_env_name
)
with open ("{}".format(var_file), "r") as myfile:
fileconf = myfile.read().replace('\n', '')
json_dictionary = json.loads(fileconf)
for key in json_dictionary:
print(key, " ", json_dictionary[key])
val = (key + " " + json_dictionary[key])
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
raw_data = "variables set {0}".format(val)
mwaa_response = requests.post(
mwaa_webserver_hostname,
headers={
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
data=raw_data
)
mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
print(mwaa_response.status_code)
print(mwaa_std_err_message)
print(mwaa_std_out_message)
except:
print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
print("Unexpected error:", sys.exc_info()[0])
sys.exit(2)
接下来做什么?
-
要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的
dags
文件夹,请参阅 添加或更新 DAGs。