EMR Serverless での Delta Lake OSS の使用 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EMR Serverless での Delta Lake OSS の使用

Amazon EMR バージョン 6.9.0 以降

注記

Amazon EMR 7.0.0 以降では、Delta Lake 3.0.0 が使用され、delta-core.jar ファイル名が delta-spark.jar に変更されます。Amazon EMR 7.0.0 以降を使用する場合は、設定で delta-spark.jar を指定する必要があります。

Amazon EMR 6.9.0 以降には Delta Lake が含まれているため、Delta Lake を自分でパッケージ化したり、EMR Serverless ジョブで --packages フラグを指定したりする必要がなくなりました。

  1. EMR Serverless ジョブを送信するときは、次の設定プロパティがあり、次のパラメータが sparkSubmitParameters フィールドに含まれていることを確認してください。

    --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  2. Delta テーブルの作成と読み取りをテストするローカル delta_sample.py を作成します。

    # delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://amzn-s3-demo-bucket/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show
  3. AWS CLI を使用して、delta_sample.py ファイルを Amazon S3 バケットにアップロードします。次に、start-job-run コマンドを使用して、既存の EMR Serverless アプリケーションにジョブを送信します。

    aws s3 cp delta_sample.py s3://amzn-s3-demo-bucket/code/ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://amzn-s3-demo-bucket/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'

Delta Lake で Python ライブラリを使用するには、delta-core ライブラリを依存関係としてパッケージ化するか、カスタムイメージとして使用して追加できます。

また、SparkContext.addPyFile を使用して delta-core JAR ファイルから Python ライブラリを追加することもできます。

import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])

Amazon EMR バージョン 6.8.0 以前

Amazon EMR 6.8.0 以前を使用している場合は、以下の手順に従って、EMR Serverless アプリケーションで Delta Lake OSS を使用します。

  1. Amazon EMR Serverless アプリケーションの Spark のバージョンと互換性のある Delta Lake のオープンソースバージョンをビルドするには、Delta GitHub に移動して指示に従います。

  2. Delta Lake ライブラリを AWS アカウントの Amazon S3 バケットにアップロードします。

  3. アプリケーション設定で EMR Serverless ジョブを送信するときは、バケットに現在ある Delta Lake JAR ファイルを含めます。

    --conf spark.jars=s3://amzn-s3-demo-bucket/jars/delta-core_2.12-1.1.0.jar
  4. Delta テーブルとの読み書きを確実に行うには、サンプル PySpark テストを実行します。

    from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://amzn-s3-demo-bucket/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show