Verwenden des Hudi-Frameworks in AWS Glue - AWS Glue

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden des Hudi-Frameworks in AWS Glue

AWS Glue 3.0 und höher unterstützt das Apache-Hudi-Framework für Data Lakes. Hudi ist ein Open-Source-Framework für Data-Lake-Speicher, das die inkrementelle Datenverarbeitung und die Entwicklung von Datenpipelines vereinfacht. Dieses Thema behandelt die verfügbaren Features zur Verwendung Ihrer Daten in AWS Glue, wenn Sie Ihre Daten in einer Hudi-Tabelle transportieren oder speichern. Weitere Informationen zu Hudi finden Sie in der offiziellen Apache-Hudi-Dokumentation.

Sie können AWS Glue verwenden, um Lese- und Schreibvorgänge für Hudi-Tabellen in Amazon S3 durchzuführen, oder mit Hudi-Tabellen arbeiten, indem Sie den AWS Glue Data Catalog verwenden. Zusätzliche Vorgänge wie Einfügen, Aktualisieren und alle Apache-Spark-Vorgänge werden ebenfalls unterstützt.

Anmerkung

Apache Hudi 0.10.1 für AWS Glue 3.0 unterstützt keine Hudi Merge on Read (MoR)-Tabellen.

In der folgenden Tabelle ist die Hudi-Version aufgelistet, die in jeder AWS-Glue-Version enthalten ist.

AWS-Glue-Version Unterstützte Hudi-Versionen
4,0 0.12.1
3.0 0.10.1

Weitere Informationen zu den von AWS-Glue unterstützten Data-Lake-Frameworks finden Sie unter Verwendung von Data-Lake-Frameworks mit AWS Glue-ETL-Aufträgen.

Aktivieren von Hudi

Führen Sie die folgenden Aufgaben aus, um Hudi für AWS Glue zu aktivieren:

  • Geben Sie hudi als Wert für den Auftragsparameter --datalake-formats an. Weitere Informationen finden Sie unter AWS Glue-Auftragsparameter.

  • Erstellen Sie einen Schlüssel mit dem Namen --conf für Ihren AWS-Glue-Auftrag und legen Sie ihn auf den folgenden Wert fest. Alternativ können Sie die folgende Konfiguration mit SparkConf in Ihrem Skript festlegen. Diese Einstellungen helfen Apache Spark bei der korrekten Handhabung von Hudi-Tabellen.

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • Die Unterstützung von Lake-Formation-Berechtigungen für Hudi ist standardmäßig nur für AWS Glue 4.0 aktiviert. Für das Lesen/Schreiben in Hudi-Tabellen, die bei Lake Formation registriert sind, ist keine zusätzliche Konfiguration erforderlich. Um eine registrierte Hudi-Tabelle lesen zu können, muss die IAM-Rolle des AWS-Glue-Auftrags über die SELECT-Berechtigung verfügen. Um in eine registrierte Hudi-Tabelle schreiben zu können, muss die IAM-Rolle des AWS-Glue-Auftrags über die SUPER-Berechtigung verfügen. Weitere Informationen zur Verwaltung von Lake-Formation-Berechtigungen finden Sie unter Granting and revoking permissions on Data Catalog resources.

Verwenden einer anderen Hudi-Version

Um eine Version von Hudi zu verwenden, die von AWS Glue nicht unterstützt wird, geben Sie Ihre eigenen Hudi-JAR-Dateien mit dem --extra-jars-Auftragsparameter an. Schließen Sie hudi nicht als Wert für den Auftragsparameter --datalake-formats ein.

Beispiel: Schreiben einer Hudi-Tabelle in Amazon S3 und deren Registrierung im AWS Glue Data Catalog

Dieses Beispielskript zeigt, wie Sie eine Hudi-Tabelle in Amazon S3 schreiben und die Tabelle im AWS Glue Data Catalog registrieren. Das Beispiel verwendet das Hive-Sync-Tool von Hudi zum Registrieren der Tabelle.

Anmerkung

In diesem Beispiel müssen Sie den --enable-glue-datacatalog-Auftragsparameter festlegen, um den AWS Glue Data Catalog als Apache-Spark-Hive-Metaspeicher verwenden zu können. Weitere Informationen hierzu finden Sie unter AWS Glue-Auftragsparameter.

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

Beispiel: Lesen einer Hudi-Tabelle aus Amazon S3 mit dem AWS Glue Data Catalog

In diesem Beispiel wird die Hudi-Tabelle gelesen, die Sie in Beispiel: Schreiben einer Hudi-Tabelle in Amazon S3 und deren Registrierung im AWS Glue Data Catalog aus Amazon S3 erstellt haben.

Anmerkung

In diesem Beispiel müssen Sie den --enable-glue-datacatalog-Auftragsparameter festlegen, um den AWS Glue Data Catalog als Apache-Spark-Hive-Metaspeicher verwenden zu können. Weitere Informationen hierzu finden Sie unter AWS Glue-Auftragsparameter.

Python

Verwenden Sie für dieses Beispiel die GlueContext.create_data_frame.from_catalog()-Methode.

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

Verwenden Sie für dieses Beispiel die getCatalogSource-Methode.

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

Beispiel: Aktualisieren und Einfügen eines DataFrame in eine Hudi-Tabelle in Amazon S3

In diesem Beispiel wird der AWS Glue Data Catalog verwendet, um einen DataFrame in die Hudi-Tabelle einzufügen, die Sie in Beispiel: Schreiben einer Hudi-Tabelle in Amazon S3 und deren Registrierung im AWS Glue Data Catalog erstellt haben.

Anmerkung

In diesem Beispiel müssen Sie den --enable-glue-datacatalog-Auftragsparameter festlegen, um den AWS Glue Data Catalog als Apache-Spark-Hive-Metaspeicher verwenden zu können. Weitere Informationen hierzu finden Sie unter AWS Glue-Auftragsparameter.

Python

Verwenden Sie für dieses Beispiel die GlueContext.write_data_frame.from_catalog()-Methode.

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

Verwenden Sie für dieses Beispiel die getCatalogSink-Methode.

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

Beispiel: Lesen einer Hudi-Tabelle aus Amazon S3 mit Spark

In diesem Beispiel wird eine Hudi-Tabelle aus Amazon S3 mit der Spark-DataFrame-API gelesen.

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

Beispiel: Schreiben einer Hudi-Tabelle in Amazon S3 mit Spark

In diesem Beispiel wird eine Hudi-Tabelle mit Spark in Amazon S3 geschrieben.

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

Beispiel: Lesen und Schreiben einer Hudi-Tabelle mit Lake-Formation-Berechtigungskontrolle

In diesem Beispiel wird mit Lake-Formation-Berechtigungen in einer Hudi-Tabelle gelesen und geschrieben.

  1. Erstellen einer Hudi-Tabelle und Registrieren in Lake Formation.

    1. Um die Lake-Formation-Berechtigungskontrolle zu aktivieren, müssen Sie zunächst den Amazon-S3-Tabellenpfad auf Lake Formation registrieren. Weitere Informationen finden Sie unter Registrieren eines Amazon-S3-Speicherorts. Sie können ihn entweder über die Lake-Formation-Konsole oder über die AWS CLI registrieren:

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Sobald Sie einen Amazon-S3-Speicherort registriert haben, gibt jede AWS-Glue-Tabelle, die auf den Speicherort (oder einen seiner untergeordneten Speicherorte) verweist, den Wert für den Parameter IsRegisteredWithLakeFormation im GetTable Aufruf als true zurück.

    2. Erstellen einer Hudi-Tabelle, die über die Spark-DataFrame-API auf den registrierten Amazon-S3-Pfad verweist:

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. Erteilen Sie Lake Formation die Berechtigung für die IAM-Rolle des AWS-Glue-Auftrags. Sie können Berechtigungen entweder über die Lake-Formation-Konsole oder über die AWS CLI gewähren. Weitere Informationen finden Sie unter Granting table permissions using the Lake Formation console and the named resource method.

  3. Lesen der in Lake Formation registrierten Hudi-Tabelle. Der Code entspricht dem Lesen einer nicht registrierten Hudi-Tabelle. Beachten Sie, dass die IAM-Rolle des AWS-Glue-Auftrags über die SELECT-Berechtigung verfügen muss, damit der Lesevorgang erfolgreich ist.

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Schreiben in eine in Lake Formation registrierte Hudi-Tabelle. Der Code entspricht dem Schreiben in eine nicht registrierte Hudi-Tabelle. Beachten Sie, dass die IAM-Rolle des AWS-Glue-Auftrags über die SUPER-Berechtigung verfügen muss, damit der Schreibvorgang erfolgreich ist.

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)