Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Invio di lavori EMR Serverless da Airflow
L'Amazon Provider di Apache Airflow fornisce operatori EMR Serverless. Per ulteriori informazioni sugli operatori, consulta Amazon EMR Serverless Operators nella documentazione
Puoi usarlo EmrServerlessCreateApplicationOperator
per creare un'applicazione Spark o Hive. Puoi anche utilizzarla EmrServerlessStartJobOperator
per avviare uno o più lavori con la tua nuova applicazione.
Per utilizzare l'operatore con Amazon Managed Workflows for Apache Airflow (MWAA) con Airflow 2.2.2, aggiungi la riga seguente al requirements.txt
file e aggiorna l'MWAAambiente per utilizzare il nuovo file.
apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9
Tieni presente che il supporto EMR Serverless è stato aggiunto alla versione 5.0.0 del provider Amazon. La release 6.0.0 è l'ultima versione compatibile con Airflow 2.2.2. È possibile utilizzare versioni successive con Airflow 2.4.3 attivo. MWAA
Il seguente esempio abbreviato mostra come creare un'applicazione, eseguire più job Spark e quindi arrestare l'applicazione. Un esempio completo è disponibile nel repository EMRServerless SamplessparkSubmit
configurazione, vedere. Offerte di lavoro Spark
from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::
account-id
:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket
" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket
/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)