Menggunakan Delta Lake OSS dengan Serverless EMR - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Delta Lake OSS dengan Serverless EMR

Amazon EMR versi 6.9.0 dan lebih tinggi

catatan

Amazon EMR 7.0.0 dan yang lebih tinggi menggunakan Delta Lake 3.0.0, yang mengganti nama file menjadi. delta-core.jar delta-spark.jar Jika Anda menggunakan Amazon EMR 7.0.0 atau yang lebih tinggi, pastikan untuk menentukan delta-spark.jar dalam konfigurasi Anda.

Amazon EMR 6.9.0 dan yang lebih tinggi termasuk Delta Lake, jadi Anda tidak perlu lagi mengemas Delta Lake sendiri atau memberikan --packages bendera dengan pekerjaan Tanpa Server Anda. EMR

  1. Saat Anda mengirimkan pekerjaan EMR Tanpa Server, pastikan Anda memiliki properti konfigurasi berikut dan sertakan parameter berikut di sparkSubmitParameters bidang.

    --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  2. Buat lokal delta_sample.py untuk menguji pembuatan dan membaca tabel Delta.

    # delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://amzn-s3-demo-bucket/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show
  3. Dengan menggunakan AWS CLI, unggah delta_sample.py file ke bucket Amazon S3 Anda. Kemudian gunakan start-job-run perintah untuk mengirimkan pekerjaan ke aplikasi EMR Tanpa Server yang ada.

    aws s3 cp delta_sample.py s3://amzn-s3-demo-bucket/code/ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://amzn-s3-demo-bucket/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'

Untuk menggunakan pustaka Python dengan Delta Lake, Anda dapat menambahkan delta-core pustaka dengan mengemasnya sebagai dependensi atau dengan menggunakannya sebagai gambar khusus.

Atau, Anda dapat menggunakan SparkContext.addPyFile untuk menambahkan pustaka Python dari file: delta-core JAR

import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])

Amazon EMR versi 6.8.0 dan lebih rendah

Jika Anda menggunakan Amazon EMR 6.8.0 atau yang lebih rendah, ikuti langkah-langkah berikut untuk menggunakan Delta Lake OSS dengan aplikasi Tanpa Server AndaEMR.

  1. Untuk membuat Delta Lake versi open source yang kompatibel dengan versi Spark di aplikasi Amazon EMR Serverless Anda, navigasikan ke Delta GitHub dan ikuti petunjuknya.

  2. Unggah perpustakaan Delta Lake ke ember Amazon S3 di ember Anda. Akun AWS

  3. Saat Anda mengirimkan pekerjaan EMR Tanpa Server dalam konfigurasi aplikasi, sertakan JAR file Delta Lake yang sekarang ada di bucket Anda.

    --conf spark.jars=s3://amzn-s3-demo-bucket/jars/delta-core_2.12-1.1.0.jar
  4. Untuk memastikan bahwa Anda dapat membaca dan menulis dari tabel Delta, jalankan PySpark tes sampel.

    from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://amzn-s3-demo-bucket/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show