AWS Glue での Delta Lake フレームワークの使用 - AWS Glue

AWS Glue での Delta Lake フレームワークの使用

AWS Glue 3.0 以降では、Linux Foundation Delta Lake フレームワークを利用できます。Delta Lake は、ACID トランザクションの実行、メタデータ処理のスケーリング、さらにストリーミングとバッチデータ処理の統合を支援する、オープンソースのデータレイクストレージフレームワークです。このトピックでは、Delta Lake テーブルに転送または保存するデータに対して、AWS Glue 内で利用可能な機能について説明します。Delta Lake の詳細については、公式の Delta Lake のドキュメントを参照してください。

Amazon S3 内にある Delta Lake テーブルに対する読み取りおよび書き込みの操作は、AWS Glue を使用して実行できます。あるいは、AWS Glue データカタログにより Delta Lake テーブルを操作することもできます。挿入、更新、およびテーブルに対する一括での読み取りと書き込みなど、その他の操作もサポートされています。Delta Lake テーブルを使用する場合、Delta Lake Python ライブラリに備わったメソッド (例えば、DeltaTable.forPath) も使用可能です。Delta Lake Python ライブラリの詳細については、「Delta Lake の Python ドキュメント」を参照してください。

次の表に、AWS Glue の各バージョンに含まれている Delta Lake のバージョンを一覧で示します。

AWS Glue のバージョン サポートされている Delta Lake バージョン
4.0 2.1.0
3.0 1.0.0

AWS Glue がサポートするデータレイクフレームワークの詳細については、「AWS Glue ETL ジョブでのデータレイクフレームワークの使用」を参照してください。

AWS Glue のための Delta Lake の有効化

AWS Glue で Delta Lake を有効化するには、以下のタスクを実行します。

  • delta--datalake-formats のジョブパラメータの値として指定します。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。

  • AWS Glue ジョブ用に、--conf という名前でキーを作成し、それに次の値を設定します。または、スクリプトで SparkConf を使用して、次の構成を設定することもできます。これらの設定は、Apache Spark が Delta Lake テーブルを適切に処理するために役立ちます。

    spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
  • Delta テーブルに関する Lake Formation の許可のサポートは、AWS Glue 4.0 のためにデフォルトで有効になっています。Lake Formation に登録された Delta テーブルの読み取り/書き込みに追加の設定は必要ありません。登録された Delta テーブルを読み取るには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要です。登録された Delta テーブルに書き込むには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要です。Lake Formation の許可の管理の詳細については、「Data Catalog リソースに対する許可の付与と取り消し」を参照してください。

別のバージョンの Delta Lake を使用する

AWS Glue でサポートされないバージョンの Delta Lake を使用するには、--extra-jars ジョブパラメータにより、独自の Delta Lake JAR ファイルを指定します。--datalake-formats ジョブパラメータの値として、delta は含めないでください。この場合、Delta Lake Python ライブラリを使用するには、--extra-py-files ジョブパラメータにより、ライブラリの JAR ファイルを指定する必要があります。Python ライブラリは、Delta Lake JAR ファイルにパッケージ化されています。

例: Delta Lake テーブルを Amazon S3 に書き込んで、そのテーブルを AWS Glue データカタログに登録する

次の AWS Glue ETL スクリプトは、Delta Lake テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する方法を示しています。

Python
# Example: Create a Delta Lake table from a DataFrame # and register the table to Glue Data Catalog additional_options = { "path": "s3://<s3Path>" } dataFrame.write \ .format("delta") \ .options(**additional_options) \ .mode("append") \ .partitionBy("<your_partitionkey_field>") \ .saveAsTable("<your_database_name>.<your_table_name>")
Scala
// Example: Example: Create a Delta Lake table from a DataFrame // and register the table to Glue Data Catalog val additional_options = Map( "path" -> "s3://<s3Path>" ) dataFrame.write.format("delta") .options(additional_options) .mode("append") .partitionBy("<your_partitionkey_field>") .saveAsTable("<your_database_name>.<your_table_name>")

例: AWS Glue データカタログを使用して Amazon S3 から Delta Lake テーブルを読み取る

次の AWS Glue ETL スクリプトでは、「例: Delta Lake テーブルを Amazon S3 に書き込んで、そのテーブルを AWS Glue データカタログに登録する」で作成した Delta Lake テーブルの読み取りを実行します。

Python

この例では create_data_frame.from_catalog メソッドを使用します。

# Example: Read a Delta Lake table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) df = glueContext.create_data_frame.from_catalog( database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
Scala

この例では getCatalogSource メソッドを使用します。

// Example: Read a Delta Lake table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val df = glueContext.getCatalogSource("<your_database_name>", "<your_table_name>", additionalOptions = additionalOptions) .getDataFrame() } }

例: AWS Glue データカタログを使用して Amazon S3 内にある Delta Lake テーブルに DataFrame を挿入する

この例では、「例: Delta Lake テーブルを Amazon S3 に書き込んで、そのテーブルを AWS Glue データカタログに登録する」で作成した Delta Lake テーブルにデータを挿入します。

注記

この例では、AWS Glue データカタログを Apache Spark Hive のメタストアとして使用するために、--enable-glue-datacatalog ジョブパラメータの設定が必要となります。詳細については、「AWS Glue ジョブでジョブパラメータを使用する」を参照してください。

Python

この例では、write_data_frame.from_catalog メソッドを使用します。

# Example: Insert into a Delta Lake table in S3 using Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame=dataFrame, database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
Scala

この例では getCatalogSink メソッドを使用します。

// Example: Insert into a Delta Lake table in S3 using Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = additionalOptions) .writeDataFrame(dataFrame, glueContext) } }

例: Spark API を使用して Amazon S3 から Delta Lake テーブルを読み取る

この例では、Spark API を使用して Amazon S3 から Delta Lake を読み取ります。

Python
# Example: Read a Delta Lake table from S3 using a Spark DataFrame dataFrame = spark.read.format("delta").load("s3://<s3path/>")
Scala
// Example: Read a Delta Lake table from S3 using a Spark DataFrame val dataFrame = spark.read.format("delta").load("s3://<s3path/>")

例: Spark を使用した Amazon S3 への Delta Lake テーブルの書き込み

この例では、Spark を使用して Amazon S3 に Delta Lake テーブルを書き込みます。

Python
# Example: Write a Delta Lake table to S3 using a Spark DataFrame dataFrame.write.format("delta") \ .options(**additional_options) \ .mode("overwrite") \ .partitionBy("<your_partitionkey_field>") .save("s3://<s3Path>")
Scala
// Example: Write a Delta Lake table to S3 using a Spark DataFrame dataFrame.write.format("delta") .options(additionalOptions) .mode("overwrite") .partitionBy("<your_partitionkey_field>") .save("s3://<s3path/>")

例: Lake Formation の許可のコントロールを使用した Delta Lake テーブルの読み取りおよび書き込み

この例では、Lake Formation の許可のコントロールを使用して Delta Lake テーブルの読み取りと書き込みを行います。

  1. Delta テーブルを作成して Lake Formation に登録する

    1. Lake Formation の許可のコントロールを有効にするには、まずテーブルの Amazon S3 パスを Lake Formation に登録する必要があります。詳細については、「Amazon S3 ロケーションの登録」を参照してください。Lake Formation コンソールから、または AWS CLI を使用して登録できます。

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Amazon S3 の場所が登録されると、その場所 (またはその子である場所) をポイントするすべての AWS Glue テーブルが、GetTable 呼び出しで IsRegisteredWithLakeFormation パラメータの値を true として返します。

    2. Spark を介して登録された Amazon S3 パスをポイントする Delta テーブルを作成します。

      注記

      次に Python の例を示します。

      dataFrame.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("<your_partitionkey_field>") \ .save("s3://<the_s3_path>")

      データが Amazon S3 に書き込まれた後、AWS Glue クローラーを使用して新しい Delta カタログテーブルを作成します。詳細については、「AWS Glue クローラーによるネイティブデータレイクテーブルサポートの紹介」を参照してください。

      AWS Glue CreateTable API を通じてテーブルを手動で作成することもできます。

  2. AWS Glue ジョブ IAM ロールに Lake Formation の許可を付与します。Lake Formation コンソールから、または AWS CLI を使用して許可を付与できます。詳細については、「Lake Formation コンソールと名前付きリソース方式を使用したテーブル許可の付与」を参照してください。

  3. Lake Formation に登録されている Delta テーブルを読み取ります。このコードは、未登録の Delta テーブルを読み取る場合と同じです。読み取りを成功させるには、AWS Glue ジョブの IAM ロールに SELECT 許可が必要であることに留意してください。

    # Example: Read a Delta Lake table from Glue Data Catalog df = glueContext.create_data_frame.from_catalog( database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )
  4. Lake Formation に登録されている Delta テーブルに書き込みます。このコードは、未登録の Delta テーブルに書き込む場合と同じです。書き込みを成功させるには、AWS Glue ジョブの IAM ロールに SUPER 許可が必要であることに留意してください。

    デフォルトでは、AWS Glue は saveMode として Append を使用します。これを変更するには、additional_options で saveMode オプションを設定します。Delta テーブルでの saveMode サポートの詳細については、「テーブルに書き込む」を参照してください。

    glueContext.write_data_frame.from_catalog( frame=dataFrame, database="<your_database_name>", table_name="<your_table_name>", additional_options=additional_options )