Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Il codice di esempio seguente importa le variabili utilizzando la CLI su Amazon Managed Workflows for Apache Airflow.
Versione
Prerequisiti
-
Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.
Autorizzazioni
Il tuo AWS account deve accedere alla AmazonMWAAAirflowCliAccess
politica. Per ulteriori informazioni, consulta Politica della CLI di Apache Airflow: Amazon MWAAAirflow CliAccess.
Dipendenze
-
Per utilizzare questo esempio di codice con Apache Airflow v2, non sono necessarie dipendenze aggiuntive. Il codice utilizza l'installazione di base di Apache Airflow v2
nell'ambiente in uso.
Esempio di codice
Il codice di esempio seguente richiede tre input: il nome dell'ambiente Amazon MWAA (inmwaa_env
), la AWS regione dell'ambiente (inaws_region
) e il file locale che contiene le variabili che desideri importare (in). var_file
import boto3
import json
import requests
import base64
import getopt
import sys
argv = sys.argv[1:]
mwaa_env=''
aws_region=''
var_file=''
try:
opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region'])
#if len(opts) == 0 and len(opts) > 3:
if len(opts) != 3:
print ('Usage: -e MWAA environment -v variable file location and filename -r aws region')
else:
for opt, arg in opts:
if opt in ("-e"):
mwaa_env=arg
elif opt in ("-r"):
aws_region=arg
elif opt in ("-v"):
var_file=arg
boto3.setup_default_session(region_name="{}".format(aws_region))
mwaa_env_name = "{}".format(mwaa_env)
client = boto3.client('mwaa')
mwaa_cli_token = client.create_cli_token(
Name=mwaa_env_name
)
with open ("{}".format(var_file), "r") as myfile:
fileconf = myfile.read().replace('\n', '')
json_dictionary = json.loads(fileconf)
for key in json_dictionary:
print(key, " ", json_dictionary[key])
val = (key + " " + json_dictionary[key])
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
raw_data = "variables set {0}".format(val)
mwaa_response = requests.post(
mwaa_webserver_hostname,
headers={
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
data=raw_data
)
mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
print(mwaa_response.status_code)
print(mwaa_std_err_message)
print(mwaa_std_out_message)
except:
print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region')
print("Unexpected error:", sys.exc_info()[0])
sys.exit(2)
Fasi successive
-
Scopri come caricare il codice DAG in questo esempio nella
dags
cartella del tuo bucket Amazon S3 in. Aggiungere o aggiornare DAGs