翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Airflow からの EMR Serverless ジョブの送信
Apache Airflow の Amazon プロバイダーは、EMR Serverless 演算子を提供します。演算子の詳細については、Apache Airflow ドキュメントの「Amazon EMR Serverless Operators
EmrServerlessCreateApplicationOperator
を使用して Spark アプリケーションまたは Hive アプリケーションを作成できます。EmrServerlessStartJobOperator
を使用して、新しいアプリケーションで 1 つ以上のジョブを開始することもできます。
Airflow 2.2.2 の Amazon Managed Workflows for Apache Airflow (MWAA) で 演算子を使用するには、次の行を requirements.txt
ファイルに追加し、新しいファイルを使用するように MWAA 環境を更新します。
apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9
Amazon プロバイダーのリリース 5.0.0 に EMR Serverless のサポートが追加されました。リリース 6.0.0 は、Airflow 2.2.2 と互換性のある最後のバージョンです。それ以降のバージョンは、MWAA の Airflow 2.4.3 で使用できます。
次の簡単な例は、アプリケーションを作成し、複数の Spark ジョブを実行してからアプリケーションを停止する方法を示しています。完全な例は、EMR Serverless SamplessparkSubmit
設定の詳細については、「EMR Serverless ジョブ実行時の Spark 設定の使用」を参照してください。
from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::
account-id
:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket
" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket
/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)