Uso de configuraciones de Spark al ejecutar trabajos de EMR sin servidor
Puede ejecutar trabajos de Spark en una aplicación con el parámetro type
establecido en SPARK
. Los trabajos deben ser compatibles con la versión de Spark compatible con la versión de lanzamiento de Amazon EMR. Por ejemplo, cuando ejecuta trabajos en una aplicación con la versión 6.6.0 de Amazon EMR, su trabajo debe ser compatible con Apache Spark 3.2.0. Para obtener información sobre las versiones de la aplicación para cada versión, consulte Versiones lanzamiento de Amazon EMR sin servidor.
Parámetros de trabajos de Spark
Al utilizar la API StartJobRun
para ejecutar un trabajo de Spark, debe especificar los siguientes parámetros.
Parámetros necesarios
Rol de tiempo de ejecución de trabajos de Spark
Utilice executionRoleArn
para especificar el ARN del rol de IAM que la aplicación utiliza para ejecutar los trabajos de Spark. Este rol debe tener los siguientes permisos:
-
Lectura de los buckets de S3 u otras fuentes de datos en las que residen sus datos
-
Lectura de los buckets o prefijos de S3 donde reside tu script PySpark o archivo JAR
-
Escritura en los buckets de S3 donde desee escribir el resultado final
-
Escritura en los registros en un prefijo o bucket de S3 que especifique
S3MonitoringConfiguration
-
Acceso a las claves de KMS si usa claves de KMS para cifrar los datos en el bucket de S3
-
Acceso al catálogo de datos de Glue de AWS si usa SparkSQL
Si su trabajo de Spark lee o escribe datos en –o desde– otras fuentes de datos, especifique los permisos adecuados en este rol de IAM. Si no proporciona estos permisos al rol de IAM, es posible que el trabajo no funcione. Para obtener más información, consulte Roles en tiempo de ejecución de trabajo para Amazon EMR sin servidor y Almacenamiento de registros.
Parámetro del controlador de trabajos de Spark
Utilice jobDriver
para proporcionar datos al trabajo. El parámetro del controlador de trabajo solo acepta un valor para el tipo de trabajo que desee ejecutar. Para un trabajo de Spark, el valor del parámetro es sparkSubmit
. Puede usar este tipo de trabajo para ejecutar Scala, Java, PySpark, SparkR y cualquier otro trabajo compatible mediante Spark-submit. Los trabajos de Spark tienen los parámetros siguientes:
-
sparkSubmitParameters
: son los parámetros de Spark adicionales que desea enviar al trabajo. Use este parámetro para anular las propiedades predeterminadas de Spark, como la memoria del controlador o el número de ejecutores, como aquellos definidos en los argumentos--conf
o--class
. -
entryPointArguments
: se trata de un conjunto de argumentos que desea pasar a su archivo Python o JAR principal. Debería manejar la lectura de estos parámetros mediante su código de punto de entrada. Separe cada argumento del conjunto con una coma. -
entryPoint
: esta es la referencia en Amazon S3 al archivo Python o JAR principal que desea ejecutar. Si está ejecutando un JAR de Scala o Java, especifique la clase de entrada principal enSparkSubmitParameters
utilizando el argumento--class
.
Para obtener más información, consulte Launching Applications with spark-submit
Parámetro de anulación de configuración de Spark
Utilice configurationOverrides
para anular las propiedades de configuración a nivel de monitorización y de aplicación. Este parámetro acepta un objeto JSON con los dos campos siguientes:
-
monitoringConfiguration
: utilice este campo para especificar la URL de Amazon S3 (s3MonitoringConfiguration
) en la que quiere que el trabajo del EMR sin servidor almacene los registros de su trabajo de Spark. Asegúrese de crear este bucket con la misma Cuenta de AWS que aloja su aplicación y en la misma Región de AWS donde se ejecuta su trabajo. -
applicationConfiguration
: para anular las configuraciones predeterminadas de las aplicaciones, puede proporcionar un objeto de configuración en este campo. Puede utilizar una sintaxis abreviada para proporcionar la configuración o hacer referencia al objeto de configuración en un archivo JSON. Los objetos de configuración se componen de una clasificación, propiedades y configuraciones anidadas opcionales. Las propiedades consisten en las configuraciones que desea anular en ese archivo. Es posible especificar varias clasificaciones para varias aplicaciones en un solo objeto JSON.nota
Las clasificaciones de configuración disponibles varían en función de la versión específica de EMR sin servidor. Por ejemplo, las clasificaciones para el Log4j personalizado
spark-driver-log4j2
yspark-executor-log4j2
solo están disponibles en las versiones 6.8.0 y posteriores.
Si usa la misma configuración en una anulación de aplicación y en los parámetros de envío de Spark, los parámetros de envío de Spark tendrán prioridad. Las configuraciones se clasifican según prioridad de la siguiente manera, de mayor a menor:
-
Configuración que EMR sin servidor proporciona cuando crea
SparkSession
. -
Configuración que usted proporciona como parte de los
sparkSubmitParameters
con el argumento--conf
. -
Configuración que usted proporciona como parte de la aplicación que se anula al iniciar un trabajo.
-
Configuración que usted proporciona como parte de la
runtimeConfiguration
al crear una aplicación. -
Configuraciones optimizadas que Amazon EMR utiliza para la versión.
-
Configuraciones de código abierto predeterminadas para la aplicación.
Para obtener más información sobre la declaración de configuraciones a nivel de aplicación y la anulación de las configuraciones durante la ejecución de un trabajo, consulte Configuración predeterminada de aplicación para EMR sin servidor.
Optimización de asignación de recursos dinámicos de Spark
Utilice dynamicAllocationOptimization
para optimizar el uso de recursos en EMR sin servidor. Al establecer esta propiedad true
en la clasificación de configuración de Spark, EMR sin servidor debe optimizar la asignación de recursos de los ejecutores para alinear mejor la velocidad a la que Spark solicita y cancela ejecutores con la velocidad a la que EMR sin servidor crea y libera trabajadores. Al hacerlo, EMR sin servidor reutiliza de manera más óptima a los trabajadores en todas las etapas, lo que reduce los costes al ejecutar trabajos con varias etapas y, al mismo tiempo, mantiene el mismo rendimiento.
Esta propiedad está disponible en todas las versiones de lanzamiento de Amazon EMR.
A continuación se muestra una clasificación de configuración de ejemplo con dynamicAllocationOptimization
.
[ { "Classification": "spark", "Properties": { "dynamicAllocationOptimization": "true" } } ]
Tenga en cuenta lo siguiente si utiliza la optimización de asignación dinámica:
-
Esta optimización está disponible para los trabajos de Spark para los que ha activado la asignación dinámica de recursos.
-
Para lograr la mejor rentabilidad, te recomendamos configurar un límite de escalado superior para los trabajadores, utilizando la configuración a nivel de trabajo
spark.dynamicAllocation.maxExecutors
o la configuración de capacidad máxima a nivel de aplicación en función de su carga de trabajo. -
Es posible que no vea una mejora de costes en los trabajos más sencillos. Por ejemplo, si su trabajo se ejecuta en un conjunto de datos pequeño o termina de ejecutarse en una etapa, es posible que Spark no necesite un mayor número de ejecutores ni varios eventos de escalado.
-
Los trabajos con una secuencia de una etapa grande, etapas más pequeñas y, después, una etapa más grande podrían experimentar una regresión en el tiempo de ejecución del trabajo. Dado que EMR sin servidor utiliza los recursos de manera más eficiente, es posible que haya menos trabajadores disponibles para las etapas más grandes y, por lo tanto, un tiempo de ejecución más prolongado.
Propiedades de trabajo de Spark
La siguiente tabla enumera las propiedades opcionales de Spark y sus valores predeterminados que puede anular al enviar un trabajo de Spark.
Clave | Descripción | Valor predeterminado |
---|---|---|
spark.archives |
Lista separada por comas de archivos que Spark extrae en el directorio de trabajo de cada ejecutor. Los tipos de archivos admitidos son .jar ,
.tar.gz , .tgz y .zip . Para especificar el nombre del directorio que se va a extraer, añada # después del nombre del archivo que desee extraer. Por ejemplo, file.zip#directory . |
NULL |
spark.authenticate |
Opción que activa la autenticación de las conexiones internas de Spark. | TRUE |
spark.driver.cores |
La cantidad de núcleos que utiliza el controlador. | 4 |
spark.driver.extraJavaOptions |
Opciones de Java adicionales para el controlador Spark. | NULL |
spark.driver.memory |
La cantidad de memoria que utiliza el controlador. | 14 G |
spark.dynamicAllocation.enabled |
Opción que activa la asignación dinámica de recursos. Esta opción escala o reduce verticalmente el número de ejecutores registrados en la aplicación, en función de la carga de trabajo. | TRUE |
spark.dynamicAllocation.executorIdleTimeout |
El tiempo que un ejecutor puede permanecer inactivo antes de que Spark lo elimine. Esto solo se aplica si activa la asignación dinámica. | 60 s |
spark.dynamicAllocation.initialExecutors |
El número inicial de ejecutores que se ejecutarán si activa la asignación dinámica. | 3 |
spark.dynamicAllocation.maxExecutors |
El límite superior del número de ejecutores si activa la asignación dinámica. | Para 6.10.0 y versiones posteriores, Para 6.9.0 y versiones anteriores, |
spark.dynamicAllocation.minExecutors |
El límite superior del número de ejecutores si activa la asignación dinámica. | 0 |
spark.emr-serverless.allocation.batch.size |
El número de contenedores que se van a solicitar en cada ciclo de asignación de ejecutores. Hay un intervalo de un segundo entre cada ciclo de asignación. | 20 |
spark.emr-serverless.driver.disk |
El disco del controlador de Spark. | 20 G |
spark.emr-serverless.driverEnv. |
Opción que añade variables de entorno al controlador de Spark. | NULL |
spark.emr-serverless.executor.disk |
El disco ejecutor de Spark. | 20 G |
spark.emr-serverless.memoryOverheadFactor |
Establece la sobrecarga de memoria para añadirla a la memoria del contenedor del controlador y del ejecutor. | 0.1 |
spark.emr-serverless.driver.disk.type |
El tipo de disco conectado al controlador de Spark. | Estándar |
spark.emr-serverless.executor.disk.type |
El tipo de disco conectado a los ejecutores de Spark. | Estándar |
spark.executor.cores |
El número de núcleos que usa cada ejecutor. | 4 |
spark.executor.extraJavaOptions |
Opciones de Java adicionales para el ejecutor de Spark. | NULL |
spark.executor.instances |
El número de contenedores de ejecutores de Spark que se asignarán. | 3 |
spark.executor.memory |
La cantidad de memoria que utiliza cada ejecutor. | 14 G |
spark.executorEnv. |
Opción que añade variables de entorno a los ejecutores de Spark. | NULL |
spark.files |
Lista de archivos separados por comas que se colocarán en el directorio de trabajo de cada ejecutor. Puede acceder a las rutas de estos archivos en el ejecutor con SparkFiles.get( . |
NULL |
spark.hadoop.hive.metastore.client.factory.class |
La clase de implementación del metaalmacén de Hive. | NULL |
spark.jars |
Archivos jar adicionales para añadirlos a la ruta de clases de tiempo de ejecución del controlador y los ejecutores. | NULL |
spark.network.crypto.enabled |
Opción que activa el cifrado RPC basado en AES. Esto incluye el protocolo de autenticación agregado en Spark 2.2.0. | FALSE |
spark.sql.warehouse.dir |
La ubicación predeterminada para las bases de datos y tablas administradas. | El valor de $PWD/spark-warehouse |
spark.submit.pyFiles |
Una lista separada por comas de archivos .zip , .egg o
.py para colocarlos en la PYTHONPATH en las aplicaciones de Python. |
NULL |
En la tabla siguiente se enumeran los parámetros de envío predeterminados de Spark.
Clave | Descripción | Valor predeterminado |
---|---|---|
archives |
Lista separada por comas de archivos que Spark extrae en el directorio de trabajo de cada ejecutor. | NULL |
class |
La clase principal de la aplicación (para aplicaciones Java y Scala). | NULL |
conf |
Una propiedad de configuración arbitraria de Spark. | NULL |
driver-cores |
La cantidad de núcleos que utiliza el controlador. | 4 |
driver-memory |
La cantidad de memoria que utiliza el controlador. | 14 G |
executor-cores |
El número de núcleos que usa cada ejecutor. | 4 |
executor-memory |
La cantidad de memoria que utiliza el ejecutor. | 14 G |
files |
Lista de archivos separados por comas que se colocarán en el directorio de trabajo de cada ejecutor. Puede acceder a las rutas de estos archivos en el ejecutor con SparkFiles.get( . |
NULL |
jars |
Lista separada por comas de archivos jar para incluirlos en las rutas de clases del controlador y el ejecutor. | NULL |
num-executors |
El número de ejecutores que se van a lanzar. | 3 |
py-files |
Una lista separada por comas de archivos .zip , .egg o .py para colocarlos en la PYTHONPATH para las aplicaciones de Python. |
NULL |
verbose |
Opción que activa una salida de depuración adicional. | NULL |
Ejemplos de Spark
El siguiente ejemplo muestra cómo utilizar la API StartJobRun
para ejecutar un script de Python. Para obtener un tutorial integral en el que se utiliza este ejemplo, consulte Introducción a Amazon EMR sin servidor. Puede encontrar ejemplos adicionales de cómo ejecutar trabajos de PySpark y agregar dependencias de Python en el repositorio GitHub de Ejemplos de EMR sin servidor
aws emr-serverless start-job-run \ --application-id
application-id
\ --execution-role-arnjob-role-arn
\ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py", "entryPointArguments": ["s3://amzn-s3-demo-destination-bucket
/wordcount_output"], "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1" } }'
El siguiente ejemplo muestra cómo utilizar la API StartJobRun
para ejecutar un JAR de Spark.
aws emr-serverless start-job-run \ --application-id
application-id
\ --execution-role-arnjob-role-arn
\ --job-driver '{ "sparkSubmit": { "entryPoint": "/usr/lib/spark/examples/jars/spark-examples.jar", "entryPointArguments": ["1"], "sparkSubmitParameters": "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" } }'