Envio de trabalhos do EMR Sem Servidor provenientes do Airflow - Amazon EMR

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Envio de trabalhos do EMR Sem Servidor provenientes do Airflow

O provedor da Amazon no Apache Airflow fornece operadores do EMR Sem Servidor. Para obter mais informações sobre operadores, consulte Amazon EMR Serverless Operators na documentação do Apache Airflow.

Você pode usar EmrServerlessCreateApplicationOperator para criar uma aplicação do Spark ou do Hive. Você também pode usar EmrServerlessStartJobOperator para iniciar um ou mais trabalhos com a nova aplicação.

Para usar o operador com o Amazon Managed Workflows para Apache Airflow (MWAA) com Airflow 2.2.2, adicione a linha a seguir ao arquivo requirements.txt e atualize o ambiente do MWAA para usar o novo arquivo.

apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9

Observe que o suporte ao EMR Sem Servidor foi adicionado à versão 5.0.0 do provedor da Amazon. A versão 6.0.0 é a última compatível com o Airflow 2.2.2. Você pode usar versões posteriores com o Airflow 2.4.3 no MWAA.

O exemplo abreviado a seguir mostra como criar uma aplicação, executar vários trabalhos do Spark e, em seguida, interromper a aplicação. Um exemplo completo está disponível no repositório do GitHub EMR Serverless Samples. Para obter detalhes adicionais sobre a configuração sparkSubmit, consulte Usando as configurações do Spark ao executar trabalhos sem servidor EMR.

from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)