将 Delta Lake OSS 与 EMR Serverless 结合使用 - Amazon EMR

将 Delta Lake OSS 与 EMR Serverless 结合使用

Amazon EMR 6.9.0 及更高版本

注意

Amazon EMR 7.0.0 及更高版本使用 Delta Lake 3.0,该版本将 delta-core.jar 文件更名为 delta-spark.jar。如果您使用 Amazon EMR 7.0.0 或更高版本,请务必在配置中指定 delta-spark.jar

Amazon EMR 6.9.0 及更高版本包含 Delta Lake,因此无需再自行打包 Delta Lake 或为 EMR Serverless 作业提供 --packages 标志。

  1. 提交 EMR Serverless 作业时,请确保具有以下配置属性,并在 sparkSubmitParameters 字段中包含以下参数。

    --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  2. 创建本地 delta_sample.py 以测试创建和读取 Delta 表。

    # delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://amzn-s3-demo-bucket/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show
  3. 使用 AWS CLI 将 delta_sample.py 文件上传到 Amazon S3 存储桶。然后使用 start-job-run 命令向现有的 EMR Serverless 应用程序提交作业。

    aws s3 cp delta_sample.py s3://amzn-s3-demo-bucket/code/ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://amzn-s3-demo-bucket/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'

要将 Python 库与 Delta Lake 结合使用,可通过将其打包为依赖项将其用作自定义映像来添加 delta-core 库。

或者,您可以使用 SparkContext.addPyFiledelta-core JAR 文件添加 Python 库:

import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])

Amazon EMR 6.8.0 及更低版本

如果您使用的是 Amazon EMR 6.8.0 或更低版本,请按照以下步骤将 Delta Lake OSS 与 EMR Serverless 应用程序结合使用。

  1. 要构建与 Amazon EMR Serverless 应用程序上的 Spark 版本兼容的 Delta Lake 开源版本,请导航到 Delta GitHub 并按照说明操作。

  2. 将 Delta Lake 库上传到 AWS 账户 中的 Amazon S3 存储桶。

  3. 在应用程序配置中提交 EMR Serverless 作业时,请包含存储桶中现有的 Delta Lake JAR 文件。

    --conf spark.jars=s3://amzn-s3-demo-bucket/jars/delta-core_2.12-1.1.0.jar
  4. 为确保您可以读取和写入 Delta 表,请运行示例 Pyspark 测试。

    from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://amzn-s3-demo-bucket/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show