将适用于 Apache Spark 的 RAPIDS Accelerator 与 Amazon EMR on EKS 结合使用 - Amazon EMR

将适用于 Apache Spark 的 RAPIDS Accelerator 与 Amazon EMR on EKS 结合使用

使用 Amazon EMR on EKS,您可以为适用于 Apache Spark 的 Nvidia RAPIDS Accelerator 运行任务。本教程介绍如何在 EC2 图形处理单元(GPU)实例类型上使用 RAPIDS 运行 Spark 任务。本教程使用以下版本:

  • Amazon EMR on EKS 发行版 6.9.0 及更高版本

  • Apache Spark 3.x

您可以借助适用于 Apache Spark 的 Nvidia RAPIDS Accelerator 插件,使用 Amazon EC2 GPU 实例类型加速 Spark。当您结合使用这些技术时,您可以加速数据科学管道,而无需更改任何代码。这可以减少数据处理和模型训练所需的运行时间。通过在更短的时间内完成更多工作,降低基础设施成本。

在开始之前,请确保您具有以下资源。

  • Amazon EMR on EKS 虚拟集群

  • 包含支持 GPU 的节点组的 Amazon EKS 集群

Amazon EKS 虚拟集群是 Amazon EKS 集群上 Kubernetes 命名空间的注册句柄,由 Amazon EMR on EKS 管理。该句柄允许 Amazon EMR 使用 Kubernetes 命名空间作为运行任务的目标。有关如何设置虚拟集群的更多信息,请参阅本指南中的 设置 Amazon EMR on EKS

您必须使用具有 GPU 实例的节点组配置 Amazon EKS 虚拟集群。您必须使用 Nvidia 设备插件配置节点。请参阅 managed node groups(托管节点组)以了解更多信息。

要将 Amazon EKS 集群配置为添加支持 GPU 的节点组,请执行以下步骤:

添加支持 GPU 的节点组
  1. 使用以下 create-nodegroup 命令创建支持 GPU 的节点组。请务必为 Amazon EKS 集群替换正确的参数。使用支持 Spark RAPIDS 的实例类型,例如 P4、P3、G5 或 G4dn。

    aws eks create-nodegroup \ --cluster-name EKS_CLUSTER_NAME \ --nodegroup-name NODEGROUP_NAME \ --scaling-config minSize=0,maxSize=5,desiredSize=2 CHOOSE_APPROPRIATELY \ --ami-type AL2_x86_64_GPU \ --node-role NODE_ROLE \ --subnets SUBNETS_SPACE_DELIMITED \ --remote-access ec2SshKey= SSH_KEY \ --instance-types GPU_INSTANCE_TYPE \ --disk-size DISK_SIZE \ --region AWS_REGION
  2. 在集群中安装 Nvidia 设备插件,以发出集群每个节点上的 GPU 数量,并在集群中运行支持 GPU 的容器。运行以下命令以安装插件:

    kubectl apply -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.9.0/nvidia-device-plugin.yml
  3. 要验证集群每个节点上的可用 GPU 数量,请运行以下命令:

    kubectl get nodes "-o=custom-columns=NAME:.metadata.name,GPU:.status.allocatable.nvidia\.com/gpu"
运行 Spark RAPIDS 任务
  1. 将 Spark RAPIDS 任务提交到 Amazon EMR on EKS 集群。以下代码显示了用于启动任务的命令示例。首次运行任务时,可能需要几分钟下载映像并将其缓存到节点上。

    aws emr-containers start-job-run \ --virtual-cluster-id VIRTUAL_CLUSTER_ID \ --execution-role-arn JOB_EXECUTION_ROLE \ --release-label emr-6.9.0-spark-rapids-latest \ --job-driver '{"sparkSubmitJobDriver": {"entryPoint": "local:///usr/lib/spark/examples/jars/spark-examples.jar","entryPointArguments": ["10000"], "sparkSubmitParameters":"--class org.apache.spark.examples.SparkPi "}}' \ ---configuration-overrides '{"applicationConfiguration": [{"classification": "spark-defaults","properties": {"spark.executor.instances": "2","spark.executor.memory": "2G"}}],"monitoringConfiguration": {"cloudWatchMonitoringConfiguration": {"logGroupName": "LOG_GROUP _NAME"},"s3MonitoringConfiguration": {"logUri": "LOG_GROUP_STREAM"}}}'
  2. 要验证 Spark RAPIDS Accelerator 是否已启用,请检查 Spark 驱动程序日志。这些日志存储在 CloudWatch 或您在运行 start-job-run 命令时指定的 S3 位置。以下示例大致显示了日志行的具体形式:

    22/11/15 00:12:44 INFO RapidsPluginUtils: RAPIDS Accelerator build: {version=22.08.0-amzn-0, user=release, url=, date=2022-11-03T03:32:45Z, revision=, cudf_version=22.08.0, branch=}
    22/11/15 00:12:44 INFO RapidsPluginUtils: RAPIDS Accelerator JNI build: {version=22.08.0, user=, url=https://github.com/NVIDIA/spark-rapids-jni.git, date=2022-08-18T04:14:34Z, revision=a1b23cd_sample, branch=HEAD}
    22/11/15 00:12:44 INFO RapidsPluginUtils: cudf build: {version=22.08.0, user=, url=https://github.com/rapidsai/cudf.git, date=2022-08-18T04:14:34Z, revision=a1b23ce_sample, branch=HEAD}
    22/11/15 00:12:44 WARN RapidsPluginUtils: RAPIDS Accelerator 22.08.0-amzn-0 using cudf 22.08.0.
    22/11/15 00:12:44 WARN RapidsPluginUtils: spark.rapids.sql.multiThreadedRead.numThreads is set to 20.
    22/11/15 00:12:44 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.
    22/11/15 00:12:44 WARN RapidsPluginUtils: spark.rapids.sql.explain is set to `NOT_ON_GPU`. Set it to 'NONE' to suppress the diagnostics logging about the query placement on the GPU.
  3. 要查看将在 GPU 上运行的操作,请执行以下步骤以启用额外的日志记录。请注意“spark.rapids.sql.explain : ALL”配置。

    aws emr-containers start-job-run \ --virtual-cluster-id VIRTUAL_CLUSTER_ID \ --execution-role-arn JOB_EXECUTION_ROLE \ --release-label emr-6.9.0-spark-rapids-latest \ --job-driver '{"sparkSubmitJobDriver": {"entryPoint": "local:///usr/lib/spark/examples/jars/spark-examples.jar","entryPointArguments": ["10000"], "sparkSubmitParameters":"--class org.apache.spark.examples.SparkPi "}}' \ ---configuration-overrides '{"applicationConfiguration": [{"classification": "spark-defaults","properties": {"spark.rapids.sql.explain":"ALL","spark.executor.instances": "2","spark.executor.memory": "2G"}}],"monitoringConfiguration": {"cloudWatchMonitoringConfiguration": {"logGroupName": "LOG_GROUP_NAME"},"s3MonitoringConfiguration": {"logUri": "LOG_GROUP_STREAM"}}}'

    上一个命令是使用 GPU 的任务示例。其输出类似于以下示例。请参阅此密钥以帮助了解输出:

    • * – 标记适用于 GPU 的操作

    • ! – 标记无法在 GPU 上运行的操作

    • @ – 标记适用于 GPU 但无法运行的操作,因为其所处的计划无法在 GPU 上运行

     22/11/15 01:22:58 INFO GpuOverrides: Plan conversion to the GPU took 118.64 ms
     22/11/15 01:22:58 INFO GpuOverrides: Plan conversion to the GPU took 4.20 ms
     22/11/15 01:22:58 INFO GpuOverrides: GPU plan transition optimization took 8.37 ms
     22/11/15 01:22:59 WARN GpuOverrides:
        *Exec <ProjectExec> will run on GPU
          *Expression <Alias> substring(cast(date#149 as string), 0, 7) AS month#310 will run on GPU
            *Expression <Substring> substring(cast(date#149 as string), 0, 7) will run on GPU
              *Expression <Cast> cast(date#149 as string) will run on GPU
          *Exec <SortExec> will run on GPU
            *Expression <SortOrder> date#149 ASC NULLS FIRST will run on GPU
            *Exec <ShuffleExchangeExec> will run on GPU
              *Partitioning <RangePartitioning> will run on GPU
                *Expression <SortOrder> date#149 ASC NULLS FIRST will run on GPU
              *Exec <UnionExec> will run on GPU
                !Exec <ProjectExec> cannot run on GPU because not all expressions can be replaced
                  @Expression <AttributeReference> customerID#0 could run on GPU
                  @Expression <Alias> Charge AS kind#126 could run on GPU
                    @Expression <Literal> Charge could run on GPU
                  @Expression <AttributeReference> value#129 could run on GPU
                  @Expression <Alias> add_months(2022-11-15, cast(-(cast(_we0#142 as bigint) + last_month#128L) as int)) AS date#149 could run on GPU
                    ! <AddMonths> add_months(2022-11-15, cast(-(cast(_we0#142 as bigint) + last_month#128L) as int)) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.AddMonths
                      @Expression <Literal> 2022-11-15 could run on GPU
                      @Expression <Cast> cast(-(cast(_we0#142 as bigint) + last_month#128L) as int) could run on GPU
                        @Expression <UnaryMinus> -(cast(_we0#142 as bigint) + last_month#128L) could run on GPU
                          @Expression <Add> (cast(_we0#142 as bigint) + last_month#128L) could run on GPU
                            @Expression <Cast> cast(_we0#142 as bigint) could run on GPU
                              @Expression <AttributeReference> _we0#142 could run on GPU
                            @Expression <AttributeReference> last_month#128L could run on GPU