Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

Spark で Delta Lake クラスターを使用する - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Spark で Delta Lake クラスターを使用する

Amazon EMR バージョン 6.9.0 以降では、ブートストラップアクションを追加しなくても、Spark クラスターで Delta Lake を使用できます。Amazon EMR リリース 6.8.0 以前の場合、ブートストラップアクションを使用することで、依存関係上必要なファイルを事前にインストールできます。

次の例では AWS CLI 、 を使用して Amazon EMR Spark クラスターで Delta Lake を操作します。

で Amazon EMR で Delta Lake を使用するには AWS Command Line Interface、まずクラスターを作成します。で Delta Lake 分類を指定する方法については AWS Command Line Interface、「クラスターの作成 AWS Command Line Interface 時に を使用して設定を指定する」または「クラスターの作成時に Java SDK を使用して設定を指定する」を参照してください。

  1. 次のコンテンツを含む configurations.json ファイルを作成します。

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
  2. 次のように設定してクラスターを作成し、サンプルの Amazon S3 bucket pathsubnet ID を実際の値に置き換えます。

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

    次のファイルを、Spark ジョブに依存関係上必要な jar として使用することで、Amazon EMR クラスターと Spark アプリケーションを構築できます。

    /usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
    注記

    Amazon EMR リリース 6.9.0 以降を使用する場合は、/usr/share/aws/delta/lib/delta-core.jar の代わりに /usr/share/aws/delta/lib/delta-spark.jar を使用します。

    詳細については、「Submitting Applications」を参照してください。

    Spark ジョブに依存関係上必要な jar を指定するには、次の設定プロパティを Spark アプリケーションに追加します。

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

    Spark ジョブの依存関係については、「Dependency Management」を参照してください。

    Amazon EMR リリース 6.9.0 以降を使用する場合は、代わりに /usr/share/aws/delta/lib/delta-spark.jar 設定を追加します。

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

Delta Lake の Spark セッションを初期化する

次の例は、インタラクティブな Spark シェルを起動する方法、Spark 送信を使用する方法、Amazon EMR Notebooks を使用して Amazon EMR の Delta Lake を操作する方法をそれぞれ示しています。

spark-shell
  1. SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Spark シェルを起動します。PySpark シェルを使用するには、spark-shellpyspark に置き換えます。

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    Amazon EMR リリース 6.15.0 以降を実行する場合は、次の設定を使用して、Delta Lake での Lake Formation に基づくきめ細かなアクセスコントロールを使用する必要があります。

    spark-shell \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
spark-submit
  1. SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。

  2. Delta Lake の Spark セッションを起動するには、次のコマンドを入力します。

    spark-submit —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    Amazon EMR リリース 6.15.0 以降を実行する場合は、次の設定を使用して、Delta Lake での Lake Formation に基づくきめ細かなアクセスコントロールを使用する必要があります。

    spark-submit \ ` --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
EMR Studio notebooks

Amazon EMR Studio ノートブックを使用して Spark セッションを初期化するには、次の例のように、Amazon EMR Notebooks で %%configure マジックコマンドを使用して Spark セッションを設定します。詳細については、「Amazon EMR 管理ガイド」の「EMR Notebooks マジックを使用する」を参照してください。

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" } }

Amazon EMR リリース 6.15.0 以降を実行する場合は、次の設定を使用して、Delta Lake での Lake Formation に基づくきめ細かなアクセスコントロールを使用する必要があります。

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.lf.managed": "true" } }
  1. SSH を使用してプライマリノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してプライマリノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Spark シェルを起動します。PySpark シェルを使用するには、spark-shellpyspark に置き換えます。

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    Amazon EMR リリース 6.15.0 以降を実行する場合は、次の設定を使用して、Delta Lake での Lake Formation に基づくきめ細かなアクセスコントロールを使用する必要があります。

    spark-shell \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true

Delta Lake テーブルに書き込む

次の例は、DataFrame を作成し、それを Delta Lake データセットとして書き込む方法を示しています。また、デフォルトの Hadoop ユーザーとして、プライマリノードに SSH 接続し、Spark シェルでデータセットを操作する方法を示しています。

注記

コードサンプルを Spark シェルに貼り付けるには、プロンプトで「:paste」と入力し、例を貼り付けて、[CTRL + D] を押します。

PySpark

Spark には、Python ベースのシェルである pyspark も用意されており、Python で記述した Spark プログラムのプロトタイプ作成に使用できます。spark-shell の場合と同様に、プライマリノードで pyspark を呼び出します。

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.write.format("delta").mode("append").saveAsTable("delta_table")
SQL
-- Create a Delta Lake table with the S3 location CREATE TABLE delta_table(id string, creation_date string, last_update_time string) USING delta LOCATION 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'; -- insert data into the table INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");

Spark には、Python ベースのシェルである pyspark も用意されており、Python で記述した Spark プログラムのプロトタイプ作成に使用できます。spark-shell の場合と同様に、プライマリノードで pyspark を呼び出します。

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()

Delta Lake テーブルから読み取る

PySpark
ddf = spark.table("delta_table") ddf.show()
Scala
val ddf = spark.table("delta_table") ddf.show()
SQL
SELECT * FROM delta_table;
ddf = spark.table("delta_table") ddf.show()
プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.