Utilisation du cadre Hudi dans AWS Glue - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation du cadre Hudi dans AWS Glue

AWS Glue 3.0 et versions ultérieures prennent en charge le cadre Apache Hudi pour les lacs de données. Hudi est un cadre de stockage de lac de données open source qui simplifie le traitement incrémentiel des données et le développement de pipelines de données. Cette rubrique décrit les fonctionnalités disponibles pour utiliser vos données dans AWS Glue lors de leur transport ou de leur stockage dans une table Hudi. Pour en savoir plus sur Hudi, consultez la documentation officielle d'Apache Hudi.

Vous pouvez utiliser AWS Glue pour effectuer des opérations de lecture et d'écriture sur des tables Hudi dans Amazon S3, ou travailler avec des tables Hudi à l'aide du catalogue de données AWS Glue. Des opérations supplémentaires, notamment l'insertion, la mise à jour et toutes les opérations d'Apache Spark, sont également prises en charge.

Note

Apache Hudi 0.10.1 pour AWS Glue 3.0 ne prend pas en charge les tables Hudi Merge on Read (MoR).

Le tableau suivant répertorie la version de Hudi incluse dans chaque version de AWS Glue.

Version de AWS Glue Version de Hudi prise en charge
4.0 0.12.1
3.0 0,1,1

Pour en savoir plus sur les cadres de lac de données pris en charge par AWS Glue, consultez Utilisation de cadres de lac de données avec des tâches AWS Glue ETL.

Activation de Hudi

Pour activer Hudi pour AWS Glue, procédez comme suit :

  • Spécifiez hudi comme valeur pour le paramètre de tâche --datalake-formats. Pour de plus amples informations, veuillez consulter Paramètres des tâches AWS Glue.

  • Créez une clé nommée --conf pour votre tâche AWS Glue et définissez-la sur la valeur suivante. Vous pouvez également définir la configuration suivante à l'aide de SparkConf dans votre script. Ces paramètres permettent à Apache Spark de gérer correctement les tables Hudi.

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • La prise en charge des autorisations de Lake Formation pour Hudi est activée par défaut pour AWS Glue 4.0. Aucune configuration supplémentaire n'est nécessaire pour la lecture/écriture dans les tables Hudi enregistrées dans Lake Formation. Pour lire une table Hudi enregistrée, le rôle IAM de la tâche AWS Glue doit disposer de l'autorisation SELECT. Pour écrire dans une table Hudi enregistrée, le rôle IAM de la tâche AWS Glue doit disposer de l'autorisation SELECT. Pour en savoir plus sur la gestion des autorisations de Lake Formation, consultez la section Octroi et révocation d'autorisations liées aux ressources du catalogue de données.

Utilisation d'une autre version de Hudi

Pour utiliser une version de Hudi non prise en charge par AWS Glue, spécifiez vos propres fichiers JAR Hudi à l'aide du paramètre de tâche --extra-jars. N'incluez pas hudi comme valeur du paramètre de tâche --datalake-formats.

Exemple : écriture d'une table Hudi sur Amazon S3 et enregistrement dans le catalogue de données AWS Glue

Cet exemple de script montre comment écrire une table Hudi sur Amazon S3 et enregistrer la table dans le catalogue de données AWS Glue. L'exemple utilise l'outil de synchronisation Hive de Hudi pour enregistrer la table.

Note

Cet exemple vous demande de définir le paramètre de tâche --enable-glue-datacatalog afin d'utiliser le catalogue de données AWS Glue en tant que métastore Hive Apache Spark. Pour en savoir plus, veuillez consulter la section Paramètres des tâches AWS Glue.

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

Exemple : lecture d'une table Hudi depuis Amazon S3 à l'aide du catalogue de données AWS Glue

Cet exemple lit la table Hudi que vous avez créée dans Exemple : écriture d'une table Hudi sur Amazon S3 et enregistrement dans le catalogue de données AWS Glue depuis Amazon S3.

Note

Cet exemple vous demande de définir le paramètre de tâche --enable-glue-datacatalog afin d'utiliser le catalogue de données AWS Glue en tant que métastore Hive Apache Spark. Pour en savoir plus, veuillez consulter la section Paramètres des tâches AWS Glue.

Python

Pour cet exemple, utilisez la méthode GlueContext.create_data_frame.from_catalog().

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

Pour cet exemple, utilisez la méthode getCatalogSource.

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

Exemple : mise à jour et insertion d'un DataFrame dans une table Hudi d'Amazon S3

Cet exemple utilise le catalogue de données AWS Glue pour insérer un DataFrame dans la table Hudi que vous avez créée dans Exemple : écriture d'une table Hudi sur Amazon S3 et enregistrement dans le catalogue de données AWS Glue.

Note

Cet exemple vous demande de définir le paramètre de tâche --enable-glue-datacatalog afin d'utiliser le catalogue de données AWS Glue en tant que métastore Hive Apache Spark. Pour en savoir plus, veuillez consulter la section Paramètres des tâches AWS Glue.

Python

Pour cet exemple, utilisez la méthode GlueContext.write_data_frame.from_catalog().

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

Pour cet exemple, utilisez la méthode getCatalogSink.

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

Exemple : lecture d'une table Hudi depuis Amazon S3 à l'aide de Spark

Cet exemple lit une table Hudi depuis Amazon S3 à l'aide de l'API Spark DataFrame.

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

Exemple : écriture d'une table Hudi sur Amazon S3 à l'aide de Spark

Cet exemple écrit une table Hudi sur Amazon S3 à l'aide de Spark.

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

Exemple : lecture et écriture d'une table Hudi avec contrôle des autorisations de Lake Formation

Cet exemple lit et écrit dans une table Hudi avec contrôle des autorisations de Lake Formation.

  1. Créez une table Hudi et enregistrez-la dans Lake Formation.

    1. Pour activer le contrôle des autorisations de Lake Formation, vous devez d'abord enregistrer le chemin d'accès Amazon S3 de la table sur Lake Formation. Pour plus d'informations, consultez la rubrique Enregistrement d'un emplacement Amazon S3. Vous pouvez l'enregistrer depuis la console Lake Formation ou à l'aide d'AWS CLI :

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Une fois que vous avez enregistré un emplacement Amazon S3, toute table AWS Glue pointant vers cet emplacement (ou l'un de ses emplacements enfants) renverra la valeur du paramètre IsRegisteredWithLakeFormation comme vraie dans l'appel GetTable.

    2. Créez une table Hudi qui pointe vers le chemin Amazon S3 enregistré via l'API Spark dataframe :

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. Accordez à Lake Formation l'autorisation d'accéder au rôle IAM AWS Glue. Vous pouvez accorder des autorisations depuis la console Lake Formation ou utiliser la CLI AWS. Pour plus d'informations, consultez la rubrique Octroi d'autorisations de table via la console Lake Formation et la méthode de ressource nommée

  3. Lisez la table Hudi enregistrée dans Lake Formation. Le code est le même que celui de la lecture d'une table Hudi non enregistrée. Notez que le rôle IAM de la tâche AWS Glue doit disposer de l'autorisation SELECT pour que la lecture réussisse.

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Écrire dans une table Hudi enregistrée dans Lake Formation. Le code est le même que celui de l'écriture dans une table Hudi non enregistrée. Notez que le rôle IAM de la tâche AWS Glue doit disposer de l'autorisation SUPER pour que l'écriture réussisse.

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)