Ajuste automático de los parámetros del escalador automático
En esta sección se describe el comportamiento del ajuste automático de varias versiones de Amazon EMR. También trata detalles sobre las diferentes configuraciones de escalado automático.
nota
Amazon EMR 7.2.0 y versiones posteriores utilizan la configuración de código abierto job.autoscaler.restart.time-tracking.enabled
para permitir la estimación del tiempo de reescalado. La estimación del tiempo de reescalado tiene la misma funcionalidad que el ajuste automático de Amazon EMR, por lo que no es necesario asignar manualmente valores empíricos al tiempo de reinicio.
Puede seguir utilizando el escalado automático de Amazon EMR, si utiliza las versiones 7.1.0 o anteriores de Amazon EMR.
- 7.2.0 and higher
-
Amazon EMR 7.2.0 y versiones posteriores miden el tiempo real de reinicio necesario para aplicar las decisiones de escalado automático. En las versiones 7.1.0 y anteriores, tenía que usar la configuración
job.autoscaler.restart.time
para configurar manualmente el tiempo máximo de reinicio estimado. Al usar la configuraciónjob.autoscaler.restart.time-tracking.enabled
, solo necesita introducir una hora de reinicio para el primer escalado. Después, el operador registra el tiempo de reinicio real y lo usará para los escalados posteriores.Para habilitar este seguimiento, utilice el siguiente comando:
job.autoscaler.restart.time-tracking.enabled: true
Las siguientes son las configuraciones relacionadas para la estimación del tiempo de reescalado.
Configuración Obligatoria Predeterminado Descripción job.autoscaler.restart.time-tracking.enabled No False Indica si el escalador automático de Flink debe ajustar automáticamente las configuraciones a lo largo del tiempo para optimizar las decisiones de escalado. Tenga en cuenta que el escalador automático solo puede ajustar automáticamente el parámetro restart.time
del escalador automático.job.autoscaler.restart.time No 5 m El tiempo de reinicio esperado que utiliza Amazon EMR en EKS hasta que el operador pueda determinar el tiempo de reinicio real a partir de las escalados anteriores. job.autoscaler.restart.time-tracking.enabled No 15 m El tiempo máximo de reinicio observado cuando job.autoscaler.restart.time-tracking.enabled
está establecido entrue
.El siguiente es un ejemplo de especificación de implementación que puede utilizar para probar la estimación del tiempo de reescalado:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autoscaler parameters job.autoscaler.enabled: "true" job.autoscaler.scaling.enabled: "true" job.autoscaler.stabilization.interval: "5s" job.autoscaler.metrics.window: "1m" job.autoscaler.restart.time-tracking.enabled: "true" job.autoscaler.restart.time: "2m" job.autoscaler.restart.time-tracking.limit: "10m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" pipeline.max-parallelism: "12" executionRoleArn:
<JOB ARN>
emrReleaseLabel: emr-7.3.0-flink-latest jobManager: highAvailabilityEnabled: false storageDir: s3://<s3_bucket>
/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<s3_bucket>
/some-job-with-back-pressure parallelism: 1 upgradeMode: statelessPara simular la contrapresión, utilice la siguiente especificación de implementación.
job: jarURI: s3://
<s3_bucket>
/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: statelessCargue el siguiente script de Python en el bucket de S3.
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()
Para comprobar que la estimación del tiempo de reescalado funciona, asegúrese de que el registro de nivel
DEBUG
del operador Flink esté habilitado. En el siguiente ejemplo se muestra cómo actualizar el archivovalues.yaml
del gráfico de Helm. A continuación, vuelva a instalar el gráfico de Helm actualizado y vuelva a ejecutar el trabajo de Flink.log4j-operator.properties: |+ # Flink Operator Logging Overrides rootLogger.level = DEBUG
Obtenga el nombre de tu pod líder.
ip=$(kubectl get configmap -n $NAMESPACE
<job-name>
-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"Ejecute el siguiente comando para obtener el tiempo de reinicio real utilizado en las evaluaciones de las métricas.
kubectl logs
<FLINK-OPERATOR-POD-NAME>
-c flink-kubernetes-operator -n<OPERATOR-NAMESPACE>
-f | grep "Restart time used in scaling summary computation"Deben aparecer registros similares a los siguientes. Tenga en cuenta que solo el primer escalado utiliza
job.autoscaler.restart.time
. Los escalados posteriores utilizan el tiempo de reinicio observado.2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M 2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
- 7.0.0 and 7.1.0
-
El escalador automático Flink integrado de código abierto utiliza numerosas métricas para tomar las mejores decisiones de escalado. Sin embargo, los valores predeterminados que utiliza para sus cálculos están pensados para ser aplicables a la mayoría de las cargas de trabajo y es posible que no sean óptimos para un trabajo determinado. La característica de ajuste automático añadida a la versión Amazon EMR en EKS del Flink Operator analiza las tendencias históricas observadas en métricas capturadas específicas y, en consecuencia, trata de calcular el valor más óptimo adaptado al trabajo en cuestión.
Configuración Obligatoria Predeterminado Descripción kubernetes.operator.job.autoscaler.autotune.enable No False Indica si el escalador automático de Flink debe ajustar automáticamente las configuraciones a lo largo del tiempo para optimizar las decisiones de escalado de los escaladores automáticos. Actualmente, el escalador automático solo puede ajustar automáticamente el parámetro restart.time
del escalador automático.kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count No 3 Indica cuántas métricas históricas de Amazon EMR en EKS guarda el escalador automático en el mapa de configuración de métricas de Amazon EMR en EKS. kubernetes.operator.job.autoscaler.autotune.metrics.restart.count No 3 Indica el número de reinicios que realiza el escalador automático antes de empezar a calcular el tiempo medio de reinicio de un trabajo determinado. Para habilitar el ajuste automático, debe haber completado lo siguiente:
-
Establezca
kubernetes.operator.job.autoscaler.autotune.enable:
entrue
-
Establezca
metrics.job.status.enable:
enTOTAL_TIME
-
Se sigue la configuración del Uso del escalador automático para las aplicaciones de Flink para habilitar el escalado automático.
El siguiente es un ejemplo de especificación de implementación que puede utilizar para probar el ajuste automático.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autotuning parameters kubernetes.operator.job.autoscaler.autotune.enable: "true" kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" metrics.job.status.enable: TOTAL_TIME # Autoscaler parameters kubernetes.operator.job.autoscaler.enabled: "true" kubernetes.operator.job.autoscaler.scaling.enabled: "true" kubernetes.operator.job.autoscaler.stabilization.interval: "5s" kubernetes.operator.job.autoscaler.metrics.window: "1m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/ state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/ pipeline.max-parallelism: "4" executionRoleArn: <JOB ARN> emrReleaseLabel: emr-6.14.0-flink-latest jobManager: highAvailabilityEnabled: true storageDir: s3://<S3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<S3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: last-state
Para simular la contrapresión, utilice la siguiente especificación de implementación.
job: jarURI: s3://<S3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: last-state
Cargue el siguiente script de Python en el bucket de S3.
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()
Para comprobar que el ajustador automático funciona, utilice los siguientes comandos. Tenga en cuenta que debe utilizar la información de su propio pod líder para el Flink Operator.
Obtenga primero el nombre de su pod líder.
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
Una vez que tenga el nombre de su pod líder, puede ejecutar el siguiente comando.
kubectl logs -n $NAMESPACE -c flink-kubernetes-operator --follow <
YOUR-FLINK-OPERATOR-POD-NAME
> | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'Deben aparecer registros similares a los siguientes.
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1)) [m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S
-