選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

將 Delta Lake 叢集與 Spark 搭配使用 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將 Delta Lake 叢集與 Spark 搭配使用

從 Amazon EMR 6.9.0 版開始,您可以將 Delta Lake 與 Spark 叢集搭配使用,而無需執行引導操作。對於 Amazon EMR 6.8.0 版及更低版本,您可以使用引導操作來預先安裝必要的依存項目。

下列範例使用 AWS CLI 在 Amazon EMR Spark 叢集上使用 Delta Lake。

若要將 Delta Lake on Amazon EMR 與 搭配使用 AWS Command Line Interface,請先建立叢集。如需有關如何使用 指定 Delta Lake 分類的資訊 AWS Command Line Interface,請參閱在建立叢集 AWS Command Line Interface 時使用 提供組態,或在建立叢集時使用 Java 開發套件提供組態

  1. 使用下列內容建立檔案 configurations.json

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
  2. 使用下列組態建立叢集,並將範例 Amazon S3 bucket pathsubnet ID 取代為您自己的值。

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

    或者,您可以建立 Amazon EMR 叢集和 Spark 應用程式,並將下列檔案作為 Spark 作業中的 JAR 依存項目:

    /usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
    注意

    如果您使用 Amazon EMR 6.9.0 版或更新版本,請使用 /usr/share/aws/delta/lib/delta-spark.jar而非 /usr/share/aws/delta/lib/delta-core.jar

    如需詳細資訊,請參閱提交應用程式

    若要在 Spark 作業中包含 jar 依存項目,您可以將下列組態屬性新增至 Spark 應用程式:

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

    如需有關 Spark 作業相依性的詳細資訊,請參閱相依性管理

    如果您使用 Amazon EMR 6.9.0 版或更新版本,請改為新增/usr/share/aws/delta/lib/delta-spark.jar組態。

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

初始化 Delta Lake 的 Spark 工作階段

下列範例顯示如何啟動互動式 Spark Shell、使用 Spark 提交,或透過 Amazon EMR Notebooks 在 Amazon EMR 上使用 Delta Lake。

spark-shell
  1. 使用 SSH 連接至主節點。如需詳細資訊,請參閱《Amazon EMR 管理指南》中的使用 SSH 連接至主節點

  2. 輸入以下命令啟動 Spark shell。若要使用 PySpark Shell,請將 spark-shell 取代為 pyspark

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    如果您執行 Amazon EMR 6.15.0 或更新版本,您還必須使用下列組態,以使用根據 Lake Formation 搭配 Delta Lake 的精細存取控制。

    spark-shell \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
spark-submit
  1. 使用 SSH 連接至主節點。如需詳細資訊,請參閱《Amazon EMR 管理指南》中的使用 SSH 連接至主節點

  2. 輸入下列命令啟動 Delta Lake 的 Spark 工作階段。

    spark-submit —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    如果您執行 Amazon EMR 6.15.0 或更新版本,您還必須使用下列組態,以使用根據 Lake Formation 搭配 Delta Lake 的精細存取控制。

    spark-submit \ ` --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true
EMR Studio notebooks

若要使用 Amazon EMR Studio 筆記本初始化 Spark 工作階段,請使用 Amazon EMR Notebooks 中的 %%configure 魔法命令來設定 Spark 工作階段,如下列範例所示。如需詳細資訊,請參閱《Amazon EMR 管理指南》中的使用 EMR Notebooks 魔法

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" } }

如果您執行 Amazon EMR 6.15.0 或更新版本,您還必須使用下列組態,以使用根據 Lake Formation 搭配 Delta Lake 的精細存取控制。

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.lf.managed": "true" } }
  1. 使用 SSH 連接至主節點。如需詳細資訊,請參閱《Amazon EMR 管理指南》中的使用 SSH 連接至主節點

  2. 輸入以下命令啟動 Spark shell。若要使用 PySpark Shell,請將 spark-shell 取代為 pyspark

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    如果您執行 Amazon EMR 6.15.0 或更新版本,您還必須使用下列組態,以使用根據 Lake Formation 搭配 Delta Lake 的精細存取控制。

    spark-shell \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ --conf spark.sql.catalog.spark_catalog.lf.managed=true

寫入至 Delta Lake 資料表

下列範例顯示如何建立 DataFrame 並將其寫入為 Delta Lake 資料集。此範例顯示如何使用 Spark Shell 處理資料集,同時使用 SSH 作為預設 hadoop 使用者連接至主節點。

注意

若要將程式碼範例貼到 Spark Shell 中,請在提示字元中鍵入 :paste、貼上範例,然後按 CTRL + D

PySpark

Spark 包含 Python 型 Shell pyspark,您可以使用該 Shell 來開發以 Python 撰寫之 Spark 程式的原型。如同使用 spark-shell,在主節點上調用 pyspark

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.write.format("delta").mode("append").saveAsTable("delta_table")
SQL
-- Create a Delta Lake table with the S3 location CREATE TABLE delta_table(id string, creation_date string, last_update_time string) USING delta LOCATION 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'; -- insert data into the table INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");

Spark 包含 Python 型 Shell pyspark,您可以使用該 Shell 來開發以 Python 撰寫之 Spark 程式的原型。如同使用 spark-shell,在主節點上調用 pyspark

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()

從 Delta Lake 資料表中讀取

PySpark
ddf = spark.table("delta_table") ddf.show()
Scala
val ddf = spark.table("delta_table") ddf.show()
SQL
SELECT * FROM delta_table;
ddf = spark.table("delta_table") ddf.show()
隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。