从 Airflow 提交 EMR Serverless 作业 - Amazon EMR

从 Airflow 提交 EMR Serverless 作业

Apache Airflow 中的 Amazon Provider 提供 EMR Serverless 运算符。有关运算符的更多信息,请参阅 Apache Airflow 文档中的 Amazon EMR Serverless Operators

您可以使用 EmrServerlessCreateApplicationOperator 创建 Spark 或 Hive 应用程序。还可以使用 EmrServerlessStartJobOperator 通过新的应用程序启动一项或多项作业。

要在 Airflow 2.2.2 版本的 Amazon Managed Workflows for Apache Airflow(MWAA)中使用运算符,请在 requirements.txt 文件中添加以下行,并更新 MWAA 环境以使用新文件。

apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9

请注意,Amazon Provider 5.0.0 版中增加了 EMR Serverless 支持。6.0.0 是与 Airflow 2.2.2 兼容的最新版本。您可以在 MWAA 上使用 Airflow 2.4.3 的后续版本。

下面的简略示例展示了如何创建应用程序、运行多个 Spark 作业,然后停止应用程序。EMR Serverless Samples GitHub 存储库中提供了完整示例。有关 sparkSubmit 配置的更多详细信息,请参阅 运行 EMR Serverless 作业时使用 Spark 配置

from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)