從 Airflow 提交無EMR伺服器任務 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

從 Airflow 提交無EMR伺服器任務

Apache Airflow 中的 Amazon Provider 提供無EMR伺服器運算子。如需運算子的詳細資訊,請參閱 Apache Airflow 文件中的 Amazon EMR Serverless 運算子

您可以使用 EmrServerlessCreateApplicationOperator來建立 Spark 或 Hive 應用程式。您也可以使用 EmrServerlessStartJobOperator 來啟動一或多個使用新應用程式的任務。

若要將 運算子與 Amazon Managed Workflows for Apache Airflow (MWAA) 搭配使用,搭配 Airflow 2.2.2,請將以下行新增至您的requirements.txt檔案,並更新您的MWAA環境以使用新檔案。

apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9

請注意,Amazon 供應商的 5.0.0 版已新增無EMR伺服器支援。6.0.0 版是與 Airflow 2.2.2 相容的最新版本。您可以在 上使用更新版本搭配 Airflow 2.4.3。 MWAA

下列縮寫範例示範如何建立應用程式、執行多個 Spark 任務,然後停止應用程式。EMR Serverless Samples GitHub 儲存庫中提供完整範例。如需sparkSubmit組態的其他詳細資訊,請參閱 執行無EMR伺服器任務時使用 Spark 組態

from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)