Escalador automático do Flink - Amazon EMR

Escalador automático do Flink

As versões 6.15.0 e superiores do Amazon EMR oferecem suporte ao escalador automático do Flink. A funcionalidade de escalador automático de trabalhos coleta métricas da execução de trabalhos de streaming do Flink e escala automaticamente os vértices individuais do trabalho. Isso reduz a contrapressão e satisfaz a meta de utilização definida por você.

Para obter mais informações, consulte a seção Autoscaler da documentação do Apache Flink Kubernetes Operator.

  • O escalador automático do Flink é compatível com o Amazon EMR 6.15.0 e versões superiores.

  • O escalador automático do Flink é compatível somente com trabalhos de streaming.

  • Só há suporte para o agendador adaptável. Não há suporte para o agendador padrão.

  • Recomendamos habilitar o ajuste de escala de clusters para permitir o provisionamento dinâmico de recursos. O Ajuste de Escala Gerenciado do Amazon EMR é preferencial porque a avaliação da métrica ocorre a cada cinco a dez segundos. Nesse intervalo, seu cluster pode se ajustar mais prontamente à mudança nos recursos necessários do cluster.

Use as etapas a seguir para habilitar o escalador automático do Flink ao criar um cluster do Amazon EMR no EC2.

  1. No console do Amazon EMR, crie um cluster do EMR.

    1. Escolha a versão emr-6.15.0 do Amazon EMR ou posterior. Selecione o pacote de aplicações Flink e quaisquer outras aplicações que você queira incluir no cluster.

      Application bundle options for Amazon EMRcluster, with Flink highlighted and selected.
    2. Na opção Ajuste de escala e provisionamento de clusters, selecione Usar Ajuste de Escala Gerenciado do EMR.

      Cluster scaling options: manual, EMR-managed (selected), or custom automatic scaling.
  2. Na seção Configurações de software, insira a configuração a seguir para habilitar o escalador automático do Flink. Em cenários de teste, defina o intervalo de decisão, o intervalo da janela de métricas e o intervalo de estabilização para um valor menor de modo que o trabalho tome imediatamente uma decisão de ajuste de escala para facilitar a verificação.

    [ { "Classification": "flink-conf", "Properties": { "job.autoscaler.enabled": "true", "jobmanager.scheduler": "adaptive", "job.autoscaler.stabilization.interval": "60s", "job.autoscaler.metrics.window": "60s", "job.autoscaler.decision.interval": "10s", "job.autoscaler.debug.logs.interval": "60s" } } ]
  3. Selecione ou defina quaisquer outras configurações da maneira que preferir e crie o cluster habilitado pelo escalador automático do Flink.

Esta seção aborda a maioria das configurações que você pode alterar com base nas suas necessidades específicas.

nota

Com configurações baseadas em tempo, como time, interval e window, a unidade padrão quando nenhuma unidade é especificada é milissegundos. Portanto, um valor de 30 sem sufixo é igual a 30 milissegundos. Para outras unidades de tempo, inclua o sufixo apropriado de s para segundos, m para minutos ou h para horas.

O escalador automático busca as métricas do nível do vértice do trabalho para cada intervalo de tempo configurável, converte-as em ações de escala, estima o novo paralelismo do vértice do trabalho e o recomenda ao agendador de trabalhos. As métricas são coletadas somente após o horário de reinício do trabalho e o intervalo de estabilização do cluster.

Chave de configuração Valor padrão Descrição Exemplos de valores
job.autoscaler.enabled false Habilita o ajuste de escala automático no cluster do Flink. true, false
job.autoscaler.decision.interval 60s Intervalo de decisões do escalador automático. 30 (a unidade padrão é milissegundos), 5m, 1h
job.autoscaler.restart.time 3m O tempo de reinício esperado a ser usado até que o operador possa determiná-lo de forma confiável com o histórico. 30 (a unidade padrão é milissegundos), 5m, 1h
job.autoscaler.stabilization.interval 300s O período de estabilização no qual nenhum novo ajuste de escala será executado. 30 (a unidade padrão é milissegundos), 5m, 1h
job.autoscaler.debug.logs.interval 300s Intervalo dos logs de depuração do escalador automático. 30 (a unidade padrão é milissegundos), 5m, 1h

O escalador automático busca as métricas, faz a agregação delas ao longo de uma janela deslizante com base no tempo e elas são avaliadas em decisões de ajuste de escala. O histórico de decisões do ajuste de escala para cada vértice do trabalho é utilizado para estimar novos paralelismos. Eles têm validade baseada na idade e no tamanho do histórico (pelo menos 1).

Chave de configuração Valor padrão Descrição Exemplos de valores
job.autoscaler.metrics.window 600s Scaling metrics aggregation window size. 30 (a unidade padrão é milissegundos), 5m, 1h
job.autoscaler.history.max.count 3 Número máximo de decisões de ajuste de escala anteriores a serem retidas por vértice. 1 para Integer.MAX_VALUE
job.autoscaler.history.max.age 24h Número mínimo de decisões de ajuste de escala anteriores a serem retidas por vértice. 30 (a unidade padrão é milissegundos), 5m, 1h

O paralelismo de cada vértice do trabalho é modificado com base na utilização de destino e vinculado aos limites mínimo e máximo do paralelismo. Não é recomendável definir uma utilização de destino próxima de 100% (ou seja, valor de 1), e o limite de utilização funciona como um buffer para lidar com as flutuações intermediárias da carga.

Chave de configuração Valor padrão Descrição Exemplos de valores
job.autoscaler.target.utilization 0.7 Utilização de destino do vértice. 0 - 1
job.autoscaler.target.utilization.boundary 0.4 Limite de utilização de destino do vértice. O ajuste de escala não será realizado se a taxa de processamento atual estiver entre [target_rate / (target_utilization - boundary) e (target_rate / (target_utilization + boundary)]. 0 - 1
job.autoscaler.vertex.min-parallelism 1 O paralelismo mínimo que o escalador automático pode usar. 0 - 200
job.autoscaler.vertex.max-parallelism 200 O paralelismo máximo que o escalador automático pode usar. Observe que esse limite será ignorado se ele for maior que o paralelismo máximo definido na configuração do Flink ou diretamente em cada operador. 0 - 200

O vértice do trabalho precisa de recursos extras para lidar com os eventos pendentes, ou backlogs, que se acumulam durante o período da operação de escala. Isso também é conhecido como duração de catch-up. Se o tempo de processamento do backlog exceder o valor de lag -threshold configurado, a utilização de destino do vértice do trabalho aumentará para o nível máximo. Isso ajuda a evitar operações desnecessárias de ajuste de escala durante o processamento do backlog.

Chave de configuração Valor padrão Descrição Exemplos de valores
job.autoscaler.backlog-processing.lag-threshold 5m Limite de atraso que evitará ajustes de escala desnecessários e removerá as mensagens pendentes responsáveis pelo atraso. 30 (a unidade padrão é milissegundos), 5m, 1h
job.autoscaler.catch-up.duration 15m A duração de destino para o processamento completo de qualquer backlog após uma operação de ajuste de escala. Defina como 0 para desabilitar o ajuste de escala baseado em backlog. 30 (a unidade padrão é milissegundos), 5m, 1h

O escalador automático não executa a operação de redução vertical de escala imediatamente após uma operação de aumento vertical de escala dentro do período de carência. Isso evita ciclos desnecessários de operações de aumento e redução vertical de escala causados por flutuações temporárias de carga.

Podemos usar a taxa de operação de redução vertical de escala para diminuir gradualmente o paralelismo e liberar recursos que atendam a picos de carga temporários. Isso também ajuda a evitar operações secundárias desnecessárias de aumento vertical de escala após uma grande operação de redução vertical de escala.

Podemos detectar uma operação ineficaz de aumento vertical de escala com base no histórico anterior de decisões de ajuste de escala de vértices de trabalho para evitar mais alterações no paralelismo.

Chave de configuração Valor padrão Descrição Exemplos de valores
job.autoscaler.scale-up.grace-period 1h Duração em que nenhuma redução vertical de escala de um vértice é permitida após ele ter sido aumentado. 30 (a unidade padrão é milissegundos), 5m, 1h
job.autoscaler.scale-down.max-factor 0.6 Fator máximo de redução vertical de escala. Um valor de 1 significa que não há limite na redução vertical de escala; 0.6 significa que o trabalho só pode ser reduzido verticalmente com 60% do paralelismo original. 0 - 1
job.autoscaler.scale-up.max-factor 100000. Taxa máxima de aumento vertical de escala. Um valor de 2.0 significa que o trabalho só pode aumentar a escala verticalmente com 200% do paralelismo atual. 0 - Integer.MAX_VALUE
job.autoscaler.scaling.effectiveness.detection.enabled false Se deve habilitar a detecção de operações ineficazes de ajuste de escala e permitir que o escalador automático bloqueie novos aumentos verticais. true, false