Spark 职位 - Amazon EMR

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Spark 职位

您可以在type参数设置为的情况下在应用程序上运行 Spark 作业SPARK。任务必须与与亚马逊EMR发行版兼容的 Spark 版本兼容。例如,当你使用亚马逊EMR版本 6.6.0 运行作业时,你的任务必须与 Apache Spark 3.2.0 兼容。有关每个版本的应用程序版本的信息,请参阅Amazon EMR 无服务器发布版本

Spark 作业参数

使用运行 Spark 作业时,可以指定以下参数。StartJobRunAPI

Spark 作业运行时角色

用于executionRoleArn为应用程序ARN用于执行 Spark 作业的IAM角色指定。此角色必须包含以下权限:

  • 从您的数据所在的 S3 存储桶或其他数据源中读取

  • 从 PySpark 脚本或文件所在的 S3 存储桶或前缀中读取 JAR

  • 写入您打算在其中写入最终输出的 S3 存储桶

  • 将日志写入S3MonitoringConfiguration指定的 S3 存储桶或前缀

  • 如果您使用KMS密钥加密 S3 存储桶中的数据,则可以访问KMS密钥

  • 如果您使用 Spark,则可以访问 AWS Glue 数据目录 SQL

如果您的 Spark 作业向其他数据源读取或写入数据,请在此IAM角色中指定相应的权限。如果您不向IAM角色提供这些权限,则任务可能会失败。有关更多信息,请参阅Amazon EMR Serverless 的 Job 运行时角色存储日志

Spark 作业驱动程序参数

用于jobDriver为作业提供输入。对于要运行的作业类型,作业驱动程序参数仅接受一个值。对于 Spark 作业,参数值为sparkSubmit。您可以使用此作业类型通过 Spark 提交来运行 Scala、Java PySpark、、SparkR 以及任何其他支持的作业。Spark 作业具有以下参数:

  • sparkSubmitParameters— 这些是您要发送到任务的其他 Spark 参数。使用此参数可以覆盖默认的 Spark 属性,例如驱动程序内存或执行器数量,例如--conf--class参数中定义的属性。

  • entryPointArguments— 这是您要传递给主文件JAR或 Python 文件的一组参数。您应该使用 Entrypoint 代码来处理读取这些参数。用逗号分隔数组中的每个参数。

  • entryPoint— 这是 Amazon S3 中对你要运行的主文件JAR或 Python 文件的引用。如果您运行的是 Scala 或 JavaJAR,请SparkSubmitParameters使用--class参数在中指定主入口类。

有关更多信息,请参阅使用 spark-submit 启动应用程序

Spark 配置覆盖参数

configurationOverrides用于覆盖监控级别和应用程序级别的配置属性。此参数接受具有以下两个字段的JSON对象:

  • monitoringConfiguration-使用此字段指定您希望EMR无服务器作业存储您的 Spark 任务日志的 Amazon S3 URL (s3MonitoringConfiguration)。请确保您创建此存储桶时使用的存储桶与托管应用程序的存储桶相同,且运行任务的 AWS 区域 位置相同。 AWS 账户

  • applicationConfiguration— 要覆盖应用程序的默认配置,可以在此字段中提供配置对象。您可以使用速记语法来提供配置,也可以在文件中引用配置对象。JSON配置对象包含分类、属性和可选的嵌套配置。属性由您希望在该文件中覆盖的设置组成。您可以在单个JSON对象中为多个应用程序指定多个分类。

    注意

    可用的配置分类因特定的EMR无服务器版本而异。例如,自定义 Log4j spark-driver-log4j2 和的分类仅spark-executor-log4j2在 6.8.0 及更高版本中可用。

如果您在应用程序覆盖和 Spark 提交参数中使用相同的配置,则 Spark 提交参数优先。配置的优先级按以下顺序排列,从高到低:

  • EMR无服务器在创建SparkSession时提供的配置。

  • 您在--conf参数中sparkSubmitParameters提供的配置。

  • 当你启动作业时,你作为应用程序的一部分提供的配置会被覆盖。

  • 您在创建应用程序runtimeConfiguration时提供的配置。

  • 优化了 Amazon 在该版本中EMR使用的配置。

  • 应用程序的默认开源配置。

有关在应用程序级别声明配置以及在作业运行期间覆盖配置的更多信息,请参阅EMR无服务器的默认应用程序配置

Spark 动态资源分配优化

dynamicAllocationOptimization用于优化EMR无服务器中的资源使用情况。true在你的 Spark 配置分类中将此属性设置为表示EMR无服务器需要优化执行器资源分配,以便更好地将 Spark 请求和取消执行者的速率与 S EMR erverless 创建和释放工作人员的速率保持一致。通过这样做,EMRServerless 可以更优化地跨阶段重复使用工作人员,从而在运行多个阶段的作业时降低成本,同时保持相同的性能。

此属性在所有 Amazon EMR 发行版本中都可用。

以下是配置分类示例dynamicAllocationOptimization

[ { "Classification": "spark", "Properties": { "dynamicAllocationOptimization": "true" } } ]

如果您使用的是动态分配优化,请考虑以下几点:

  • 此优化适用于已启用动态资源分配的 Spark 作业。

  • 为了实现最佳成本效益,我们建议根据您的工作负载使用作业级别设置spark.dynamicAllocation.maxExecutors应用程序级最大容量设置,为工作人员配置扩展上限。

  • 在更简单的工作中,你可能看不到成本的提高。例如,如果您的作业在小型数据集上运行或在一个阶段完成运行,则 Spark 可能不需要更多的执行器或多个扩展事件。

  • 具有大阶段、较小阶段然后又是大阶段的作业可能会在作业运行时出现回归。随着 EMR Serverless 更有效地使用资源,这可能会导致较大阶段的可用工作人员减少,从而延长运行时间。

Spark 作业属性

下表列出了可选的 Spark 属性及其默认值,您可以在提交 Spark 作业时覆盖这些属性。

描述 默认值
spark.archives 以逗号分隔的档案列表,Spark 将其提取到每个执行者的工作目录中。支持的文件类型包括.jar .tar.gz.tgz.zip。要指定要提取的目录名,请在要提取的文件名#后面添加。例如,file.zip#directory NULL
spark.authenticate 用于开启 Spark 内部连接身份验证的选项。 TRUE
spark.driver.cores 驱动程序使用的内核数。 4
spark.driver.extraJavaOptions Spark 驱动程序的额外 Java 选项。 NULL
spark.driver.memory 驱动程序使用的内存量。 14G
spark.dynamicAllocation.enabled 开启动态资源分配的选项。此选项可根据工作负载增加或缩小在应用程序中注册的执行者数量。 TRUE
spark.dynamicAllocation.executorIdleTimeout 在 Spark 将其移除之前,执行者可以保持空闲状态的时间长度。这仅在您开启动态分配时适用。 60
spark.dynamicAllocation.initialExecutors 开启动态分配时要运行的初始执行者数量。 3
spark.dynamicAllocation.maxExecutors 如果您启用动态分配,则执行者数量的上限。

对于 6.10.0 及更高版本,infinity

对于 6.9.0 及更低版本,100

spark.dynamicAllocation.minExecutors 如果您启用动态分配,则执行者数量的下限。 0
spark.emr-serverless.allocation.batch.size 每个执行人分配周期中要请求的容器数量。每个分配周期之间有一秒钟的间隔。 20
spark.emr-serverless.driver.disk Spark 驱动程序磁盘。 20G
spark.emr-serverless.driverEnv.[KEY] 将环境变量添加到 Spark 驱动程序的选项。 NULL
spark.emr-serverless.executor.disk Spark 执行器磁盘。 20G
spark.emr-serverless.memoryOverheadFactor 设置要添加到驱动程序和执行器容器内存中的内存开销。 0.1
spark.emr-serverless.driver.disk.type 连接到 Spark 驱动程序的磁盘类型。 Standard
spark.emr-serverless.executor.disk.type 连接到 Spark 执行程序的磁盘类型。 Standard
spark.executor.cores 每个执行器使用的内核数。 4
spark.executor.extraJavaOptions Spark 执行器的额外 Java 选项。 NULL
spark.executor.instances 要分配的 Spark 执行器容器的数量。 3
spark.executor.memory 每个执行器使用的内存量。 14G
spark.executorEnv.[KEY] 向 Spark 执行器添加环境变量的选项。 NULL
spark.files 每个执行器工作目录中要存放的文件列表,以逗号分隔。您可以使用访问执行器中这些文件的文件路径。SparkFiles.get(fileName) NULL
spark.hadoop.hive.metastore.client.factory.class Hive 元数据仓实现类。 NULL
spark.jars 要添加到驱动程序和执行程序的运行时类路径中的其他 jar。 NULL
spark.network.crypto.enabled 开启AES基于RPC加密的选项。这包括在 Spark 2.2.0 中添加的身份验证协议。 FALSE
spark.sql.warehouse.dir 托管数据库和表的默认位置。 $PWD/spark-warehouse
spark.submit.pyFiles 要放置在 PYTHONPATH Python 应用程序中的.zip.egg、或 .py文件的逗号分隔列表。 NULL

下表列出了默认的 Spark 提交参数。

描述 默认值
archives 以逗号分隔的档案列表,Spark 将其提取到每个执行者的工作目录中。 NULL
class 应用程序的主类(适用于 Java 和 Scala 应用程序)。 NULL
conf 一个任意 Spark 配置属性。 NULL
driver-cores 驱动程序使用的内核数。 4
driver-memory 驱动程序使用的内存量。 14G
executor-cores 每个执行器使用的内核数。 4
executor-memory 执行程序使用的内存量。 14G
files 要放置在每个执行器工作目录中的文件列表,以逗号分隔。您可以使用访问执行器中这些文件的文件路径。SparkFiles.get(fileName) NULL
jars 要包含在驱动程序和执行器类路径中的 jar 的逗号分隔列表。 NULL
num-executors 要启动的执行者数量。 3
py-files 要放置在 PYTHONPATH Python 应用程序上的.zip.egg、或.py文件的逗号分隔列表。 NULL
verbose 开启其他调试输出的选项。 NULL

火花示例

以下示例说明如何使用运行 Python 脚本。StartJobRun API有关使用此示例的 end-to-end教程,请参阅开始使用 Amazon EMR Serverless。您可以在EMR无服务器示例 GitHub 存储库中找到有关如何运行 PySpark 作业和添加 Python 依赖关系的其他示例

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py", "entryPointArguments": ["s3://amzn-s3-demo-destination-bucket/wordcount_output"], "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1" } }'

以下示例说明如何使用运行 Spark JAR。StartJobRun API

aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --job-driver '{ "sparkSubmit": { "entryPoint": "/usr/lib/spark/examples/jars/spark-examples.jar", "entryPointArguments": ["1"], "sparkSubmitParameters": "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1" } }'