Utilizzo del framework Hudi in AWS Glue - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo del framework Hudi in AWS Glue

AWS Glue 3.0 e versioni successive supportano il framework Apache Hudi per i data lake. Hudi è un framework di archiviazione di data lake open source che semplifica l'elaborazione incrementale dei dati e lo sviluppo di pipeline di dati. Questo argomento descrive le funzionalità disponibili per l'utilizzo dei dati in AWS Glue durante il trasporto o l'archiviazione dei dati in una tabella Hudi. Per ulteriori informazioni su Hudi, consulta la documentazione ufficiale di Apache Hudi.

È possibile usare AWS Glue per eseguire operazioni di lettura e scrittura sulle tabelle Hudi in Amazon S3 o lavorare con le tabelle Hudi utilizzando il catalogo dati AWS Glue. Sono supportate anche operazioni aggiuntive, tra cui inserimento, aggiornamento e tutte le operazioni di Apache Spark.

Nota

Apache Hudi 0.10.1 per AWS Glue 3.0 non supporta le tabelle Hudi Merge on Read (MoR).

La tabella seguente elenca la versione di Hudi inclusa in ogni versione di AWS Glue.

Versione di AWS Glue Versione Hudi supportata
4.0 0.12.1
3.0 0,10,1

Per ulteriori informazioni sui framework di data lake supportati da AWS Glue, consulta Utilizzo di framework data lake con processi ETL di AWS Glue.

Abilitazione di Hudi

Per abilitare Hudi per AWS Glue, completa le seguenti attività:

  • Specifica hudi come valore per i parametri del processo --datalake-formats. Per ulteriori informazioni, consulta Parametri del processo AWS Glue.

  • Crea una chiave denominata --conf per il tuo processo AWS Glue e impostala sul seguente valore. In alternativa, puoi impostare la seguente configurazione usando SparkConf nel tuo script. Queste impostazioni consentono ad Apache Spark di gestire correttamente le tabelle Hudi.

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • Il supporto delle autorizzazioni di Lake Formation per Hudi è abilitato per impostazione predefinita per AWS Glue 4.0. Non è necessaria alcuna configurazione aggiuntiva per la lettura/scrittura su tabelle Hudi registrate da Lake Formation. Per leggere una tabella Hudi registrata, il ruolo IAM del processo AWS Glue deve disporre dell'autorizzazione SELECT. Per scrivere su una tabella Hudi registrata, il ruolo IAM del processo AWS Glue deve disporre dell'autorizzazione SUPER. Per ulteriori informazioni sulla gestione delle autorizzazioni di Lake Formation, consulta Concessione e revoca delle autorizzazioni del catalogo dati.

Utilizzo di una versione differente di Hudi

Per utilizzare una versione di Hudi non supportata da AWS Glue, specifica i tuoi file JAR Hudi utilizzando il parametro del processo --extra-jars. Non includere hudi come valore per il parametro del processo --datalake-formats.

Esempio: scrittura di una tabella Hudi su Amazon S3 e registrazione nel catalogo dati AWS Glue

Il seguente script di esempio mostra come scrivere una tabella Hudi su Amazon S3 e registrarla nel catalogo dati AWS Glue. Per registrare la tabella, viene utilizzato lo strumento Hive Sync di Hudi.

Nota

Questo esempio consente di impostare il parametro del processo --enable-glue-datacatalog in modo da utilizzare il catalogo dati AWS Glue come metastore Apache Spark Hive. Per ulteriori informazioni, consulta Parametri del processo AWS Glue.

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

Esempio: lettura di una tabella Hudi da Amazon S3 tramite il catalogo dati AWS Glue

In questo esempio viene letta la tabella Hudi che hai creato in Esempio: scrittura di una tabella Hudi su Amazon S3 e registrazione nel catalogo dati AWS Glue da Amazon S3.

Nota

Questo esempio consente di impostare il parametro del processo --enable-glue-datacatalog in modo da utilizzare il catalogo dati AWS Glue come metastore Apache Spark Hive. Per ulteriori informazioni, consulta Parametri del processo AWS Glue.

Python

Per questo esempio, usa il metodo GlueContext.create_data_frame.from_catalog().

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

Per questo esempio, utilizza il metodo getCatalogSource.

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

Esempio: aggiornamento e inserimento di un DataFrame in una tabella Hudi in Amazon S3

In questo esempio viene utilizzato il catalogo dati AWS Glue per inserire un DataFrame nella tabella Hudi creata in Esempio: scrittura di una tabella Hudi su Amazon S3 e registrazione nel catalogo dati AWS Glue.

Nota

Questo esempio consente di impostare il parametro del processo --enable-glue-datacatalog in modo da utilizzare il catalogo dati AWS Glue come metastore Apache Spark Hive. Per ulteriori informazioni, consulta Parametri del processo AWS Glue.

Python

Per questo esempio, usa il metodo GlueContext.write_data_frame.from_catalog().

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

Per questo esempio, utilizza il metodo getCatalogSink.

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

Esempio: lettura di una tabella Hudi da Amazon S3 tramite Spark

In questo esempio viene letta una tabella Hudi da Amazon S3 tramite l'API Spark DataFrame.

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

Esempio: scrittura di una tabella Hudi su Amazon S3 tramite Spark

In questo esempio viene scritta una tabella Hudi su Amazon S3 tramite Spark.

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

Esempio: lettura e scrittura della tabella Hudi con il controllo delle autorizzazioni di Lake Formation

Questo esempio legge da e scrive su una tabella Hudi con il controllo delle autorizzazioni di Lake Formation.

  1. Crea una tabella Hudi e registrala in Lake Formation.

    1. Per abilitare il controllo delle autorizzazioni di Lake Formation, devi prima registrare il percorso della tabella Amazon S3 su Lake Formation. Per ulteriori informazioni, consulta la pagina Registrazione di una posizione Amazon S3. Puoi registrarlo dalla console di Lake Formation o utilizzando la CLI AWS:

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Una volta registrata una posizione Amazon S3, qualsiasi tabella AWS Glue che punta alla posizione, o a una delle sue sedi secondarie, restituirà il valore del parametro IsRegisteredWithLakeFormation come true nella chiamata GetTable.

    2. Crea una tabella Hudi che punti al percorso registrato di Amazon S3 tramite l'API Spark DataFrame:

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. Concedi a Lake Formation l'autorizzazione per il ruolo IAM del processo AWS Glue. Puoi concedere le autorizzazioni dalla console di Lake Formation o utilizzando la CLI AWS. Per ulteriori informazioni, consulta la pagina Concessione delle autorizzazioni alla tabella tramite la console di Lake Formation e il metodo delle risorse denominate

  3. Leggi la tabella Hudi registrata in Lake Formation. Il codice equivale a leggere una tabella Hudi non registrata. Il ruolo IAM del processo AWS Glue deve disporre dell'autorizzazione SELECT affinché la lettura abbia esito positivo.

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Scrivi sulla tabella Hudi registrata in Lake Formation. Il codice equivale a scrivere su una tabella Hudi non registrata. Il ruolo IAM del processo AWS Glue deve disporre dell'autorizzazione SUPER affinché la scrittura abbia esito positivo.

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)