Travailler avec un jeu de données Hudi - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Travailler avec un jeu de données Hudi

Hudi prend en charge l'insertion, la mise à jour et la suppression de données dans des jeux de données Hudi via Spark. Pour plus d'informations, consultez Writing Hudi tables dans la documentation Apache Hudi.

Les exemples suivants montrent comment lancer le shell interactif Spark, utiliser Spark submit ou utiliser Amazon EMR Notebooks pour travailler avec Hudi sur Amazon. EMR Vous pouvez également utiliser l' DeltaStreamer utilitaire Hudi ou d'autres outils pour écrire dans un ensemble de données. Tout au long de cette section, les exemples montrent comment utiliser des ensembles de données à l'aide du shell Spark lorsque vous êtes connecté au nœud principal en SSH tant qu'hadooputilisateur par défaut.

Lorsque vous exécutez spark-shell ou spark-sql utilisez Amazon EMR 6.7.0 ou version ultérieure, passez les commandes suivantes. spark-submit

Note

Amazon EMR 6.7.0 utilise Apache Hudi 0.11.0-amzn-0, qui contient des améliorations significatives par rapport aux versions précédentes de Hudi. Pour plus d'informations, consultez le Guide de migration Apache Hudi 0.11.0. Les exemples de cet onglet reflètent ces changements.

Pour ouvrir le shell Spark sur le nœud primaire
  1. Connectez-vous au nœud principal à l'aide deSSH. Pour plus d'informations, consultez Se connecter au nœud principal SSH à l'aide de l'Amazon EMR Management Guide.

  2. Entrez la commande suivante pour lancer le shell Spark. Pour utiliser la PySpark coque, remplacez spark-shell avec pyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Lorsque vous exécutez spark-shell ou spark-sql utilisez Amazon EMR 6.6.x ou une version antérieure, passez les commandes suivantes. spark-submit

Note
  • Amazon EMR 6.2, 5.31 et versions ultérieures (Hudi 0.6.x et versions ultérieures) peuvent les omettre de la configuration. spark-avro.jar

  • Amazon EMR 6.5, 5.35 et versions ultérieures (Hudi 0.9.x et versions ultérieures) peuvent ne pas figurer dans la configuration. spark.sql.hive.convertMetastoreParquet=false

  • Amazon EMR 6.6, 5.36 et versions ultérieures (Hudi 0.10.x et versions ultérieures) doivent inclure la HoodieSparkSessionExtension configuration telle que décrite dans le guide Spark de la version : 0.10.0 :

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Pour ouvrir le shell Spark sur le nœud primaire
  1. Connectez-vous au nœud principal à l'aide deSSH. Pour plus d'informations, consultez Se connecter au nœud principal SSH à l'aide de l'Amazon EMR Management Guide.

  2. Entrez la commande suivante pour lancer le shell Spark. Pour utiliser la PySpark coque, remplacez spark-shell avec pyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Pour utiliser Hudi avec Amazon EMR Notebooks, vous devez d'abord copier les fichiers JAR Hudi du système de fichiers local vers HDFS le nœud principal du cluster de blocs-notes. Vous utilisez ensuite l'éditeur de bloc-notes pour configurer votre EMR bloc-notes afin qu'il utilise Hudi.

Pour utiliser Hudi avec Amazon Notebooks EMR
  1. Créez et lancez un cluster pour Amazon EMR Notebooks. Pour plus d'informations, consultez la section Création de EMR clusters Amazon pour ordinateurs portables dans le guide EMR de gestion Amazon.

  2. Connectez-vous au nœud principal du cluster à l'aide SSH des fichiers jar du système de fichiers local, puis copiez-les HDFS comme indiqué dans les exemples suivants. Dans l'exemple, nous créons un répertoire dans HDFS pour faciliter la gestion des fichiers. Vous pouvez choisir votre propre destinationHDFS, si vous le souhaitez.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Ouvrez l'éditeur de bloc-notes, entrez le code de l'exemple suivant et exécutez-le.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Pour utiliser Hudi avec Amazon EMR Notebooks, vous devez d'abord copier les fichiers JAR Hudi du système de fichiers local vers HDFS le nœud principal du cluster de blocs-notes. Vous utilisez ensuite l'éditeur de bloc-notes pour configurer votre EMR bloc-notes afin qu'il utilise Hudi.

Pour utiliser Hudi avec Amazon Notebooks EMR
  1. Créez et lancez un cluster pour Amazon EMR Notebooks. Pour plus d'informations, consultez la section Création de EMR clusters Amazon pour ordinateurs portables dans le guide EMR de gestion Amazon.

  2. Connectez-vous au nœud principal du cluster à l'aide SSH des fichiers jar du système de fichiers local, puis copiez-les HDFS comme indiqué dans les exemples suivants. Dans l'exemple, nous créons un répertoire dans HDFS pour faciliter la gestion des fichiers. Vous pouvez choisir votre propre destinationHDFS, si vous le souhaitez.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Ouvrez l'éditeur de bloc-notes, entrez le code de l'exemple suivant et exécutez-le.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Initialisation d'une session Spark pour Hudi

Lorsque vous utilisez Scala, vous devez importer les classes suivantes dans votre session Spark. Vous devez le faire une fois par session Spark.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Écrire dans un jeu de données Hudi

Les exemples suivants montrent comment créer un jeu de données Hudi DataFrame et l'écrire sous forme de jeu de données Hudi.

Note

Pour coller des exemples de code dans le shell Spark, tapez :paste à l'invite, collez l'exemple, puis appuyez sur CTRL + D.

Chaque fois que vous écrivez un dans DataFrame un jeu de données Hudi, vous devez spécifierDataSourceWriteOptions. Beaucoup de ces options sont susceptibles d'être identiques dans les opérations d'écriture. L'exemple suivant spécifie les options communes à l'aide de la variable hudiOptions, que les exemples suivants utilisent.

Note

Amazon EMR 6.7.0 utilise Apache Hudi 0.11.0-amzn-0, qui contient des améliorations significatives par rapport aux versions précédentes de Hudi. Pour plus d'informations, consultez le Guide de migration Apache Hudi 0.11.0. Les exemples de cet onglet reflètent ces changements.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')
Note

Vous pouvez voir « hoodie » au lieu de Hudi dans les exemples de code et les notifications. La base de code Hudi utilise largement l'ancienne orthographe « hoodie ».

DataSourceWriteOptions référence pour Hudi
Option Description

TABLE_NAME

Nom de la table sous laquelle enregistrer l'ensemble de données.

TABLE_TYPE_OPT_KEY

Facultatif. Spécifie si l'ensemble de données est créé en tant que "COPY_ON_WRITE" ou "MERGE_ON_READ". L’argument par défaut est "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

Champ de clé d'enregistrement dont la valeur sera utilisée en tant que composant recordKey de HoodieKey. La valeur réelle sera obtenue en appelant .toString() sur la valeur de champ. Des champs imbriqués peuvent être spécifiés à l'aide de la notation par points, par exemple, a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

Champ de chemin de partition dont la valeur sera utilisée en tant que composant partitionPath de HoodieKey. La valeur réelle sera obtenue en appelant .toString() sur la valeur de champ.

PRECOMBINE_FIELD_OPT_KEY

Champ utilisé dans la pré-combinaison avant l'écriture réelle. Lorsque deux enregistrements ont la même valeur de clé, Hudi sélectionne celui avec la plus grande valeur pour le champ de pré-combinaison, comme déterminé par Object.compareTo(..).

Les options suivantes sont nécessaires uniquement pour enregistrer la table du jeu de données Hudi dans votre metastore. Si vous n'enregistrez pas votre jeu de données Hudi en tant que table dans le metastore Hive, ces options ne sont pas requises.

DataSourceWriteOptions référence pour Hive
Option Description

HIVE_DATABASE_OPT_KEY

Base de données Hive avec laquelle synchroniser. L’argument par défaut est "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

Classe utilisée pour extraire les valeurs de champ de partition dans les colonnes de partition Hive.

HIVE_PARTITION_FIELDS_OPT_KEY

Champ dans l'ensemble de données à utiliser pour déterminer les colonnes de partition Hive.

HIVE_SYNC_ENABLED_OPT_KEY

Lorsqu'elle est définie sur "true", enregistre l'ensemble de données auprès du metastore Apache Hive. L’argument par défaut est "false".

HIVE_TABLE_OPT_KEY

Obligatoire. Nom de la table dans Hive avec laquelle synchroniser. Par exemple, "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Facultatif. Nom d'utilisateur Hive à utiliser lors de la synchronisation. Par exemple, "hadoop".

HIVE_PASS_OPT_KEY

Facultatif. Mot de passe Hive pour l'utilisateur spécifié par HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

Le URL métastore Hive.

Données d'Upsert

L'exemple suivant montre comment insérer des données en écrivant un DataFrame. Contrairement à l'exemple d'insertion précédent, la valeur OPERATION_OPT_KEY est définie sur UPSERT_OPERATION_OPT_VAL. En outre, .mode(SaveMode.Append) est spécifié pour indiquer que l'enregistrement doit être ajouté.

Note

Amazon EMR 6.7.0 utilise Apache Hudi 0.11.0-amzn-0, qui contient des améliorations significatives par rapport aux versions précédentes de Hudi. Pour plus d'informations, consultez le Guide de migration Apache Hudi 0.11.0. Les exemples de cet onglet reflètent ces changements.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Supprimer un enregistrement

Pour supprimer définitivement un enregistrement, vous pouvez insérer une charge utile vide. Dans ce cas, l'option PAYLOAD_CLASS_OPT_KEY spécifie la classe EmptyHoodieRecordPayload. L'exemple utilise le même DataFrame,updateDF, utilisé dans l'exemple upsert pour spécifier le même enregistrement.

Note

Amazon EMR 6.7.0 utilise Apache Hudi 0.11.0-amzn-0, qui contient des améliorations significatives par rapport aux versions précédentes de Hudi. Pour plus d'informations, consultez le Guide de migration Apache Hudi 0.11.0. Les exemples de cet onglet reflètent ces changements.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Vous pouvez également supprimer définitivement des données en définissant OPERATION_OPT_KEY sur DELETE_OPERATION_OPT_VAL pour supprimer tous les enregistrements de le jeu de données que vous soumettez. Pour obtenir des instructions sur les suppressions logicielles et pour plus d'informations sur la suppression de données stockées dans des tables Hudi, consultez Suppressions dans la documentation d'Apache Hudi.

Lire à partir d'un jeu de données Hudi

Pour récupérer les données à l'heure actuelle, Hudi effectue des requêtes instantanées par défaut. Voici un exemple d'interrogation de le jeu de données écrit dans S3 dans Écrire dans un jeu de données Hudi. Remplacez s3://amzn-s3-demo-bucket/myhudidataset avec le chemin de votre table, et ajoutez des astérisques génériques pour chaque niveau de partition, plus un astérisque supplémentaire. Dans cet exemple, il existe un niveau de partition. Nous avons donc ajouté deux symboles génériques.

Note

Amazon EMR 6.7.0 utilise Apache Hudi 0.11.0-amzn-0, qui contient des améliorations significatives par rapport aux versions précédentes de Hudi. Pour plus d'informations, consultez le Guide de migration Apache Hudi 0.11.0. Les exemples de cet onglet reflètent ces changements.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

Requêtes incrémentielles

Vous pouvez également effectuer des requêtes incrémentielles avec Hudi pour obtenir un flux d'enregistrements modifiés depuis un horodatage de validation donné. Pour ce faire, définissez le champ QUERY_TYPE_OPT_KEY sur QUERY_TYPE_INCREMENTAL_OPT_VAL. Ajoutez ensuite une valeur pour BEGIN_INSTANTTIME_OPT_KEY pour obtenir tous les enregistrements écrits depuis l'heure spécifiée. Les requêtes incrémentielles sont généralement dix fois plus efficaces que leurs homologues par lots, car elles ne traitent que les enregistrements modifiés.

Lorsque vous effectuez des requêtes incrémentielles, utilisez le chemin de la table racine (de base) sans les astérisques génériques utilisés pour les requêtes Snapshot.

Note

Presto ne prend pas en charge les requêtes incrémentielles.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

Pour plus d'informations sur la lecture d'jeux de données Hudi, consultez la rubrique Interrogation de tables Hudi dans la documentation d'Apache Hudi.