Airflow에서 EMR Serverless 작업 제출
Apache Airflow의 Amazon Provider에서는 EMR Serverless 연산자를 제공합니다. 연산자에 대한 자세한 내용은 Apache Airflow 설명서의 Amazon EMR Serverless Operators
EmrServerlessCreateApplicationOperator
를 사용하여 Spark 또는 Hive 애플리케이션을 생성할 수 있습니다. EmrServerlessStartJobOperator
를 사용하여 새 애플리케이션으로 하나 이상의 작업을 시작할 수도 있습니다.
Airflow 2.2.2와 함께 Amazon Managed Workflows for Apache Airflow(MWAA)에서 연산자를 사용하려면 requirements.txt
파일에 다음 줄을 추가하고 MWAA 환경을 업데이트하여 새 파일을 사용합니다.
apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9
Amazon 제공업체의 릴리스 5.0.0에는 EMR Serverless 지원이 추가되었습니다. 릴리스 6.0.0은 Airflow 2.2.2와 호환되는 마지막 버전입니다. MWAA의 Airflow 2.4.3에서 이후 버전을 사용할 수 있습니다.
다음 약식 예제에서는 애플리케이션을 생성하고 여러 Spark 작업을 실행한 다음, 애플리케이션을 중지하는 방법을 보여줍니다. 전체 예제는 EMR Serverless SamplessparkSubmit
구성에 대한 자세한 내용은 EMR Serverless 작업을 실행하는 경우 Spark 구성 사용 섹션을 참조하세요.
from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::
account-id
:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket
" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket
/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)