Configure Spark
You can configure Spark on Amazon EMR
Configuration classifications for Spark on Amazon EMR include the following:
-
spark
– Sets themaximizeResourceAllocation
property to true or false. When true, Amazon EMR automatically configuresspark-defaults
properties based on cluster hardware configuration. For more information, see Using maximizeResourceAllocation. -
spark-defaults
– Sets values in thespark-defaults.conf
file. For more information, see Spark configurationin the Spark documentation. -
spark-env
– Sets values in thespark-env.sh
file. For more information, see Environment variablesin the Spark documentation. -
spark-hive-site
– Sets values in thehive-site.xml
for Spark. -
spark-log4j
– (Amazon EMR releases 6.7.x and lower) Sets values in thelog4j.properties
file. For more information, see the log4j.properties.templatefile on Github. -
spark-log4j2
– (Amazon EMR releases 6.8.0 and higher) Sets values in thelog4j2.properties
file. For more information, see the log4j2.properties.templatefile on Github. -
spark-metrics
– Sets values in themetrics.properties
file. For settings and more information, see the metrics.properties.templatefile on Github, and Metrics in Spark documentation.
Note
If you're migrating Spark workloads to Amazon EMR from another platform, we recommend that you test your workloads with the Spark defaults set by Amazon EMR before you add custom configurations. Most customers see improved performance with our default settings.
Topics
Spark defaults set by Amazon EMR
The following table shows how Amazon EMR sets default values in
spark-defaults
that affect applications.
Spark defaults set by Amazon EMR | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Setting | Description | Default value | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.executor.memory |
The amount of memory to use per executor process. For
example: |
This setting is determined by the core and task instance types in the cluster. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.executor.cores |
The number of cores to use on each executor. |
This setting is determined by the core and task instance types in the cluster. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.dynamicAllocation.enabled |
When true, use dynamic resource allocation to scale the number of executors registered with an application up and down based on the workload. |
NoteSpark shuffle service is automatically configured by Amazon EMR . |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.sql.hive.advancedPartitionPredicatePushdown.enabled |
When true, advanced partition predicate pushdown into Hive metastore is enabled. |
true |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.sql.hive.stringLikePartitionPredicatePushdown.enabled |
Pushes down NoteGlue doesn't support predicate push down for
|
true |
Configuring Spark garbage collection on Amazon EMR 6.1.0
Setting custom garbage collection configurations with
spark.driver.extraJavaOptions
and
spark.executor.extraJavaOptions
results in driver or executor
launch failure with Amazon EMR 6.1 because of a conflicting garbage collection
configuration with Amazon EMR 6.1.0. For Amazon EMR 6.1.0, the default garbage collection
configuration is set through spark.driver.defaultJavaOptions
and
spark.executor.defaultJavaOptions
. This configuration applies only
to Amazon EMR 6.1.0. JVM options not related to garbage collection, such as those for
configuring logging (-verbose:class
), can still be set through
extraJavaOptions
. For more information, see Spark application properties.
Using
maximizeResourceAllocation
To configure your executors to use the maximum resources possible on each node in
a cluster, set maximizeResourceAllocation
to true
in your
spark
configuration classification. The
maximizeResourceAllocation
is specific to Amazon EMR . When you enable
maximizeResourceAllocation
, Amazon EMR calculates the maximum compute
and memory resources available for an executor on an instance in the core instance
group. It then sets the corresponding spark-defaults
settings based on
the calculated maximum values.
Amazon EMR calculates the maximum compute and memory resources available for an executor based on an instance type from the core instance fleet. Since each instance fleet can have different instance types and sizes within a fleet, the executor configuration that Amazon EMR uses might not be the best for your clusters, so we don't recommend using the default settings when using maximum resource allocation. Configure custom settings for your instance fleet clusters.
Note
You should not use the maximizeResourceAllocation
option on
clusters with other distributed applications like HBase. Amazon EMR uses custom YARN
configurations for distributed applications, which can conflict with
maximizeResourceAllocation
and cause Spark applications to
fail.
The following is an example Spark configuration classification with
maximizeResourceAllocation
set to true
.
[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
Settings configured in spark-defaults when
maximizeResourceAllocation is enabled | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Setting | Description | Value | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.default.parallelism | Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user. |
2X number of CPU cores available to YARN containers. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.driver.memory | Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (for example, 1g, 2g). |
Setting is configured based on the instance types in the cluster. However, because the Spark driver application may run on either the primary or one of the core instances (for example, in YARN client and cluster modes, respectively), this is set based on the smaller of the instance types in these two instance groups. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.executor.memory | Amount of memory to use per executor process. (for example, 1g, 2g) |
Setting is configured based on the core and task instance types in the cluster. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.executor.cores | The number of cores to use on each executor. | Setting is configured based on the core and task instance types in the cluster. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.executor.instances | The number of executors. |
Setting is configured based on the core and task instance
types in the cluster. Set unless
|
Configuring node decommissioning behavior
With Amazon EMR release 5.9.0 and higher, Spark on Amazon EMR includes a set of features to help ensure that Spark gracefully handles node termination because of a manual resize or an automatic scaling policy request. Amazon EMR implements a deny listing mechanism in Spark that is built on top of the YARN decommissioning mechanism. This mechanism helps ensure that no new tasks are scheduled on a node that is decommissioning, while at the same time allowing tasks that are already running to complete. In addition, there are features to help recover Spark jobs faster if shuffle blocks are lost when a node terminates. The recomputation process is triggered sooner and optimized to recompute faster with fewer stage retries, and jobs can be prevented from failing because of fetch failures that are caused by missing shuffle blocks.
Important
The spark.decommissioning.timeout.threshold
setting was added in
Amazon EMR release 5.11.0 to improve Spark resiliency when you use Spot instances. In
earlier releases, when a node uses a Spot instance, and the instance is
terminated because of bid price, Spark may not be able to handle the termination
gracefully. Jobs may fail, and shuffle recomputations could take a significant
amount of time. For this reason, we recommend using release 5.11.0 or later if
you use Spot instances.
Spark node decommissioning settings | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Setting | Description | Default value | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
When set to |
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
The amount of time that a node in the |
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Available in Amazon EMR release 5.11.0 or later. Specified in seconds.
When a node transitions to the decommissioning state, if the host
will decommission within a time period equal to or less than this
value, Amazon EMR not only deny lists the node, but also cleans up the
host state (as specified by
|
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
When set to |
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
When set to |
true |
Spark ThriftServer environment variable
Spark sets the Hive Thrift Server Port environment variable,
HIVE_SERVER2_THRIFT_PORT
, to 10001.
Changing Spark default settings
You change the defaults in spark-defaults.conf
using the
spark-defaults
configuration classification or the
maximizeResourceAllocation
setting in the spark
configuration classification.
The following procedures show how to modify settings using the CLI or console.
To create a cluster with spark.executor.memory set to 2g using the CLI
-
Create a cluster with Spark installed and
spark.executor.memory
set to 2g, using the following command, which references a file,myConfig.json
stored in Amazon S3.aws emr create-cluster --release-label
emr-7.3.0
--applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/amzn-s3-demo-bucket/myfolder/myConfig.jsonNote
Linux line continuation characters (\) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).
myConfig.json
:[ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]
To create a cluster with spark.executor.memory set to 2g using the console
Navigate to the new Amazon EMR console and select Switch to the old console from the side navigation. For more information on what to expect when you switch to the old console, see Using the old console.
-
Choose Create cluster, Go to advanced options.
-
Choose Spark.
-
Under Edit software settings, leave Enter configuration selected and enter the following configuration:
classification=spark-defaults,properties=[spark.executor.memory=2G]
-
Select other options, choose and then choose Create cluster.
To set maximizeResourceAllocation
-
Create a cluster with Spark installed and
maximizeResourceAllocation
set to true using the AWS CLI, referencing a file,myConfig.json
, stored in Amazon S3.aws emr create-cluster --release-label
emr-7.3.0
--applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/amzn-s3-demo-bucket/myfolder/myConfig.jsonNote
Linux line continuation characters (\) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).
myConfig.json
:[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
Note
With Amazon EMR version 5.21.0 and later, you can override cluster configurations and specify additional configuration classifications for each instance group in a running cluster. You do this by using the Amazon EMR console, the AWS Command Line Interface (AWS CLI), or the AWS SDK. For more information, see Supplying a Configuration for an Instance Group in a Running Cluster.
Migrating from Apache Log4j 1.x to Log4j 2.x
Apache Sparklog4j.properties
file to configure Log4j in Spark processes. Apache
Spark releases 3.3.0 and later use Apache Log4j 2.x and the
log4j2.properties
file to configure Log4j in Spark
processes.
If you have configured Apache Spark Log4j using an Amazon EMR release lower than 6.8.0,
then you must remove the legacy spark-log4j
configuration
classification and migrate to the spark-log4j2
configuration
classification and key format before you can upgrade to Amazon EMR 6.8.0 or later. The
legacy spark-log4j
classification causes cluster creation to fail with
a ValidationException
error in Amazon EMR releases 6.8.0 and
later. You will not be charged for a failure related to the Log4j incompatibility,
but you must remove the defunct spark-log4j
configuration
classification to continue.
For more information about migrating from Apache Log4j 1.x to Log4j 2.x, see the
Apache
Log4j Migration Guide
Note
With Amazon EMR , Apache Spark uses a log4j2.properties
file rather
than the .xml file described in the Apache
Log4j Migration Guide