Uso del marco de Hudi en AWS Glue - AWS Glue

Uso del marco de Hudi en AWS Glue

AWS Glue 3.0 y versiones posteriores son compatibles con el marco de Apache Hudi para lagos de datos. Hudi es un marco de almacenamiento de lagos de datos de código abierto que simplifica el procesamiento progresivo de datos y el desarrollo de canalizaciones de datos. En este tema, se describen las características disponibles para usar los datos en AWS Glue al transportar o almacenar datos en una tabla de Hudi. Para más información sobre Hudi, consulte la documentación oficial de Apache Hudi.

Puede usar AWS Glue para llevar a cabo operaciones de lectura y escritura en tablas de Hudi en Amazon S3, o trabajar con tablas de Hudi mediante el Catálogo de datos de AWS Glue. También se admiten operaciones adicionales, como insertar, actualizar y todas las operaciones de Apache Spark.

nota

Apache Hudi 0.10.1 para AWS Glue 3.0 no admite las tablas Merge on Read (MoR, fusionar al leer) de Hudi.

La siguiente tabla muestra la versión de Hudi que se incluye en cada versión de AWS Glue.

Versión de AWS Glue Versión de Hudi compatible
4.0 0.12.1
3.0 0.10.1

Para más información sobre los marcos de lagos de datos compatibles con AWS Glue, consulte Uso de marcos de lagos de datos con trabajos de ETL de AWS Glue.

Activación de Hudi

Para activar Hudi para AWS Glue, haga las siguientes tareas:

  • Especifique hudi como valor para el parámetro del trabajo --datalake-formats. Para obtener más información, consulte Uso de los parámetros de trabajo en los trabajos de AWS Glue.

  • Cree una clave con el nombre --conf para el trabajo de AWS Glue y establézcala en el siguiente valor. Como alternativa, puede establecer la siguiente configuración mediante SparkConf en su script. Esta configuración ayuda a Apache Spark a gestionar correctamente las tablas de Hudi.

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • La compatibilidad con los permisos de Lake Formation para Hudi está habilitada de forma predeterminada en AWS Glue 4.0. No se necesita ninguna configuración adicional para leer o escribir en las tablas Hudi registradas en Lake Formation. Para leer una tabla Hudi registrada, el rol de IAM del trabajo de AWS Glue debe tener el permiso SELECT. Para escribir en una tabla Hudi registrada, el rol de IAM del trabajo de AWS Glue debe tener el permiso SUPER. Para obtener más información sobre la administración de los permisos de Lake Formation, consulte Otorgar y revocar permisos en los recursos del catálogo de datos.

Uso de una versión diferente de Hudi

Para usar una versión de Hudi que no sea compatible con AWS Glue, indique sus propios archivos JAR de Hudi mediante el parámetro de trabajo --extra-jars. No incluya hudi como valor para el parámetro de trabajo --datalake-formats.

Ejemplo: escribir una tabla de Hudi en Amazon S3 y registrarla en el Catálogo de datos de AWS Glue

En este script de ejemplo, se muestra cómo escribir una tabla de Hudi en Amazon S3 y registrar la tabla en el Catálogo de datos de AWS Glue. En el ejemplo, se utiliza la herramienta Hive Sync de Hudi para registrar la tabla.

nota

En este ejemplo, se requiere que establezca el parámetro de trabajo de --enable-glue-datacatalog para utilizar el Catálogo de datos de AWS Glue como metaalmacén de Apache Spark Hive. Para obtener más información, consulte Uso de los parámetros de trabajo en los trabajos de AWS Glue.

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

Ejemplo: leer una tabla de Hudi de Amazon S3 con el Catálogo de datos de AWS Glue

En este ejemplo, se lee la tabla de Hudi que creó en Ejemplo: escribir una tabla de Hudi en Amazon S3 y registrarla en el Catálogo de datos de AWS Glue de Amazon S3.

nota

En este ejemplo, se requiere que establezca el parámetro de trabajo de --enable-glue-datacatalog para utilizar el Catálogo de datos de AWS Glue como metaalmacén de Apache Spark Hive. Para obtener más información, consulte Uso de los parámetros de trabajo en los trabajos de AWS Glue.

Python

Para este ejemplo, utilice el método GlueContext.create_data_frame.from_catalog().

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

Para este ejemplo, utilice el método getCatalogSource.

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

Ejemplo: actualizar e insertar un objeto DataFrame en una tabla de Hudi en Amazon S3

En este ejemplo, se utiliza el Catálogo de datos de AWS Glue para insertar un DataFrame en la tabla de Hudi que creó en Ejemplo: escribir una tabla de Hudi en Amazon S3 y registrarla en el Catálogo de datos de AWS Glue.

nota

En este ejemplo, se requiere que establezca el parámetro de trabajo de --enable-glue-datacatalog para utilizar el Catálogo de datos de AWS Glue como metaalmacén de Apache Spark Hive. Para obtener más información, consulte Uso de los parámetros de trabajo en los trabajos de AWS Glue.

Python

Para este ejemplo, utilice el método GlueContext.write_data_frame.from_catalog().

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

Para este ejemplo, utilice el método getCatalogSink.

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

Ejemplo: leer una tabla de Hudi de Amazon S3 con Spark

En este ejemplo, se lee una tabla de Hudi de Amazon S3 con la API de DataFrame de Spark.

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

Ejemplo: escribir una tabla de Hudi en Amazon S3 con Spark

En este ejemplo, se escribe una tabla de Hudi en Amazon S3 con Spark.

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

Ejemplo: Lea y escriba una tabla Hudi con el control de permisos de Lake Formation

En este ejemplo se lee y escribe una tabla Hudi con el control de permisos de Lake Formation.

  1. Cree una tabla Hudi y regístrela en Lake Formation.

    1. Para habilitar el control de permisos de Lake Formation, primero tendrá que registrar la tabla de la ruta Amazon S3 en Lake Formation. Para obtener más información, consulte Registro de una ubicación de Amazon S3. Puede registrarlo desde la consola de Lake Formation o mediante la AWS CLI:

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Una vez que registre una ubicación de Amazon S3, cualquier tabla de AWS Glue que apunte a la ubicación (o a cualquiera de sus ubicaciones secundarias) devolverá el valor del parámetro IsRegisteredWithLakeFormation como verdadero en la llamada GetTable.

    2. Cree una tabla Hudi que apunte a la ruta de Amazon S3 registrada a través de la API de marcos de datos de Spark:

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. Conceda permiso a Lake Formation para el rol de IAM del trabajo de AWS Glue. Puede conceder permisos desde la consola de Lake Formation o mediante la AWS CLI. Para obtener más información, consulte Concesión de permisos de tabla mediante la consola de Lake Formation y el método de recurso con nombre

  3. Lea la tabla Hudi registrada en Lake Formation. El código es el mismo que leer una tabla Hudi no registrada. Tenga en cuenta que el rol de IAM del trabajo de AWS Glue debe tener el permiso SELECT para que la lectura se realice correctamente.

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Escribe en una tabla Hudi registrada en Lake Formation. El código es el mismo que escribir en una tabla Hudi no registrada. Tenga en cuenta que el rol de IAM del trabajo de AWS Glue debe tener el permiso SUPER para que la escritura se realice correctamente.

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)