Ajuste de desempenho para o Apache Airflow no Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Ajuste de desempenho para o Apache Airflow no Amazon MWAA

Esta página descreve as práticas recomendadas para ajustar o desempenho de um ambiente Amazon Managed Workflows for Apache Airflow usando Como usar opções de configuração do Apache Airflow no Amazon MWAA.

Como adicionar uma opção de configuração do Apache Airflow

O procedimento a seguir descreve as etapas para adicionar uma opção de configuração do Airflow ao seu ambiente.

  1. Abra a página Ambientes no console do Amazon MWAA.

  2. Escolha um ambiente.

  3. Selecione a opção Editar.

  4. Escolha Próximo.

  5. Escolha Adicionar configuração personalizada no painel Opções de configuração do Airflow.

  6. Escolha uma configuração na lista suspensa e insira um valor, ou digite uma configuração personalizada e insira um valor.

  7. Escolha Adicionar configuração personalizada para cada configuração que você deseja adicionar.

  8. Escolha Salvar.

Para saber mais, consulte Como usar opções de configuração do Apache Airflow no Amazon MWAA.

Agendador do Apache Airflow

O agendador do Apache Airflow é um componente central do Apache Airflow. Um problema com o agendador pode impedir que os DAGs sejam analisados e as tarefas sejam agendadas. Para obter mais informações sobre o ajuste do agendador do Apache Airflow, consulte Ajustar o desempenho do agendador no site de documentação do Apache Airflow.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para o agendador do Apache Airflow e casos de uso do agendador.

Apache Airflow v2
Version (Versão) Opção de configuração Padrão Descrição Caso de uso

v2

celery.sync_parallelism

1

O número de processos que o Celery Executor usa para sincronizar o estado da tarefa.

Você pode usar essa opção para evitar conflitos de fila limitando os processos que o Celery Executor usa. Por padrão, o valor é definido como 1 para evitar erros na entrega de logs de tarefas ao CloudWatch Logs. Definir o valor como 0 significa usar o número máximo de processos, mas pode causar erros ao entregar logs de tarefas.

v2

scheduler.idle_sleep_time

1

O número de segundos de espera entre o processamento consecutivo do arquivo DAG no “loop” do agendador.

Você pode usar essa opção para liberar o uso da CPU no Agendador ao aumentar o tempo de inatividade do Agendador após terminar de recuperar os resultados da análise do DAG, localizar e enfileirar tarefas e executar tarefas em fila no Executor. Aumentar esse valor consome o número de threads do Agendador executados em um ambiente em scheduler.parsing_processes para o Apache Airflow v2 e em scheduler.max_threads para o Apache Airflow v1. Isso pode reduzir a capacidade dos Agendadores de analisar DAGs e aumentar o tempo necessário para que os DAGs apareçam no servidor Web.

v2

scheduler.max_dagruns_to_create_per_loop

10

O número máximo de DAGs para criar DagRuns por “loop” do Agendador.

Você pode usar essa opção para liberar recursos para agendar tarefas ao diminuir o número máximo de DagRuns para o “loop” do Agendador.

v2

scheduler.parsing_processes

Defina usando a seguinte fórmula: (2 * number of vCPUs) - 1 por padrão.

O número de threads que o Agendador pode executar paralelamente para agendar DAGs.

Você pode usar essa opção para liberar recursos ao diminuir o número de processos que o Agendador executa paralelamente para analisar DAGs. Recomendamos manter esse número baixo se a análise do DAG estiver afetando o agendamento de tarefas. Você deve especificar um valor menor que a contagem de vCPUs em seu ambiente. Para saber mais, consulte Limites.

Limites

Esta seção descreve os limites que você deve considerar ao ajustar os parâmetros padrão do agendador.

scheduler.parsing_processes, scheduler.max_threads

Dois threads são permitidos por vCPU para uma classe de ambiente. Pelo menos um thread deve ser reservado para o agendador de uma classe de ambiente. Se você notar um atraso no agendamento de tarefas, talvez seja necessário aumentar sua classe de ambiente. Por exemplo, um ambiente grande tem uma instância de contêiner Fargate de 4 vCPUs para seu agendador. Isso significa que um máximo total de 7 threads está disponível para uso em outros processos. Ou seja, dois threads multiplicaram quatro vCPUs, menos um para o próprio agendador. O valor que você especifica em scheduler.max_threads e não scheduler.parsing_processes deve exceder o número de threads disponíveis para uma classe de ambiente (conforme mostrado a seguir):

  • mw1.small: não deve exceder 1 thread para outros processos. O thread restante é reservado para o Agendador.

  • mw1.medium: não deve exceder 3 threads de outros processos. O thread restante é reservado para o Agendador.

  • mw1.large: não deve exceder 7 threads de outros processos. O thread restante é reservado para o Agendador.

Pastas do DAG

O Agendador do Apache Airflow verifica continuamente a pasta DAGs em seu ambiente. Quaisquer arquivos plugins.zip contidos ou arquivo Python (.py) contendo instruções de importação “airflow”. Todos os objetos Python DAG resultantes são então colocados em um DagBag para que esse arquivo seja processado pelo Agendador para determinar quais tarefas, se houver, precisam ser agendadas. A análise do arquivo Dag ocorre independentemente de os arquivos conterem objetos DAG viáveis.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para a pasta DAGs e os casos de uso dela.

Apache Airflow v2
Version (Versão) Opção de configuração Padrão Descrição Caso de uso

v2

scheduler.dag_dir_list_interval

300 segundos

O número de segundos em que a pasta DAGs deve ser examinada em busca de novos arquivos.

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos para analisar a pasta DAGs. Recomendamos aumentar esse valor se você estiver vendo longos tempos de análise em total_parse_time metrics, o qual pode ser em virtude de um grande número de arquivos na sua pasta DAGs.

v2

scheduler.min_file_process_interval

30 segundos

O número de segundos após os quais o agendador analisa um DAG e as atualizações do DAG são refletidas.

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos que o agendador espera antes de analisar um DAG. Por exemplo, se você especificar um valor de 30, o arquivo DAG será analisado a cada 30 segundos. Recomendamos manter esse número alto para diminuir o uso da CPU em seu ambiente.

Arquivos DAG

Como parte do loop do agendador do Apache Airflow, arquivos DAG individuais são analisados para extrair objetos do DAG Python. No Apache Airflow v2 e superior, o agendador analisa um número máximo de processos de análise ao mesmo tempo. O número de segundos especificado em scheduler.min_file_process_interval deve passar antes que o mesmo arquivo seja analisado novamente.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para arquivos DAG do Apache Airflow e os casos de uso deles.

Apache Airflow v2
Version (Versão) Opção de configuração Padrão Descrição Caso de uso

v2

core.dag_file_processor_timeout

50 segundos

O número de segundos antes que o DagFileProcessor atinja o tempo limite de processamento de um arquivo DAG.

Você pode usar essa opção para liberar recursos ao aumentar o tempo necessário até que o DagFileProcessor atinja o tempo limite. Recomendamos aumentar esse valor se você estiver vendo tempos limite nos logs de processamento do DAG que resultam na falta de carregamento de DAGs viáveis.

v2

core.dagbag_import_timeout

30 segundos

O tempo limite do número de segundos antes de importar um arquivo do Python.

Você pode usar essa opção para liberar recursos ao aumentar o tempo necessário até que o Agendador atinja o tempo limite ao importar um arquivo Python para extrair os objetos DAG. Essa opção é processada como parte do “loop” do Agendador e deve conter um valor menor que o valor especificado em core.dag_file_processor_timeout.

v2

core.min_serialized_dag_update_interval

30

O número mínimo de segundos após os quais os DAGs serializados no banco de dados são atualizados.

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos após os quais os DAGs serializados no banco de dados são atualizados. Recomendamos aumentar esse valor se você tiver um número grande de DAGs, ou DAGs complexos. Aumentar esse valor reduz a carga no Agendador e no banco de dados à medida que os DAGs são serializados.

v2

core.min_serialized_dag_fetch_interval

10

O número de segundos em que um DAG serializado é recuperado do banco de dados quando já está carregado no DagBag.

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos em que um DAG serializado é recuperado. O valor deve ser maior que o valor especificado em core.min_serialized_dag_update_interval para reduzir as taxas de “gravação” do banco de dados. Aumentar esse valor reduz a carga no servidor Web e no banco de dados à medida que os DAGs são serializados.

Tarefas

Tanto o agendador quanto os operadores do Apache Airflow estão envolvidos em tarefas de enfileiramento e desenfileiramento. O agendador leva as tarefas analisadas prontas para serem agendadas de um status vazio para um status agendado. O executor, também em execução no contêiner do agendador no Fargate, coloca essas tarefas em fila e define o status como Em fila. Quando os operadores têm capacidade, retiram a tarefa da fila e definem o status como executando, que posteriormente muda o status para Êxito ou Falha com base no sucesso ou falha da tarefa.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para tarefas do Apache Airflow e os casos de uso delas.

As opções de configuração padrão que o Amazon MWAA substitui estão marcadas em vermelho.

Apache Airflow v2
Version (Versão) Opção de configuração Padrão Descrição Caso de uso

v2

core.parallelism

Definido dinamicamente com base em (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

O número máximo de instâncias de tarefas que podem ter o status “Em execução”.

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas que podem ser executadas simultaneamente. O valor especificado deve ser o número de Operadores disponíveis “vezes” a densidade de tarefas dos Operadores. Recomendamos alterar esse valor somente quando você estiver vendo um grande número de tarefas bloqueadas no estado “Em execução” ou “Em fila”.

v2

core.dag_concurrency

10000

O número de instâncias de tarefas que podem ser executadas simultaneamente para cada DAG.

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas autorizadas a serem executadas simultaneamente. Por exemplo, se você tiver cem DAGs com dez tarefas paralelas e quiser que todos os DAGs sejam executados simultaneamente, é possível calcular o paralelismo máximo como o número de Operadores disponíveis “vezes” a densidade de tarefas em celery.worker_concurrency dos Operadores, dividido pelo número de DAGs (por exemplo, 100).

v2

core.execute_tasks_new_python_interpreter

True

Determina se o Apache Airflow executa tarefas bifurcando o processo principal ou criando um processo em Python.

Quando definido como True, o Apache Airflow reconhece as alterações feitas nos seus plug-ins como um novo processo Python criado para executar tarefas.

v2

celery.worker_concurrency

N/D

O Amazon MWAA substitui a instalação básica do Airflow por essa opção para escalar Operadores como parte de seu componente de escalonamento automático.

Qualquer valor especificado para essa opção é ignorado.

v2

celery.worker_autoscale

mw1.small: 5,0

mw1.medium: 10,0

mw1.large: 20,0

mw1.xlarge: 40,0

mw1.2xlarge: 80,0

A simultaneidade de tarefas para Operadores.

Você pode usar essa opção para liberar recursos ao reduzir a simultaneidade maximum, minimum de tarefas dos Operadores. Os Operadores aceitam até maximum tarefas simultâneas configuradas, independentemente de haver recursos suficientes para fazer isso. Se as tarefas forem agendadas sem recursos suficientes, elas falharão imediatamente. Recomendamos alterar esse valor em tarefas que consomem muitos recursos ao reduzir os valores para serem menores que os padrões a fim de permitir mais capacidade por tarefa.