기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
DAG를 사용하여 CLI에서 변수 가져오기
다음 샘플 코드는 Amazon Managed Workflows for Apache Airflow의 CLI를 사용하여 변수를 가져옵니다.
버전
-
이 페이지의 코드 예제는 Python 3.10
의 Apache Airflow v2에서 사용할 수 있습니다.
사전 조건
-
이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.
권한
AWS 계정에 AmazonMWAAAirflowCliAccess
정책에 대한 액세스 권한이 필요합니다. 자세한 내용은 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess을 참조하십시오.
종속성
-
이 코드 예제를 Apache Airflow v2와 함께 사용하려면 추가 종속성이 필요하지 않습니다. 코드는 사용자 환경에 설치된 Apache Airflow v2 기본 설치
를 사용합니다.
코드 샘플
다음 샘플 코드는 Amazon MWAA 환경 이름(의 mwaa_env
), 환경의 AWS 리전(의 aws_region
), 가져오려는 변수가 포함된 로컬 파일(의 )의 세 가지 입력을 사용합니다var_file
.
import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)
다음 단계
-
이 예제의 DAG 코드를 DAG 추가 또는 업데이트에서 Amazon S3 버킷의
dags
폴더에 업로드하는 방법을 알아봅니다.