Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Travailler avec un jeu de données Hudi
Hudi prend en charge l'insertion, la mise à jour et la suppression de données dans des jeux de données Hudi via Spark. Pour plus d'informations, consultez Writing Hudi tables
Les exemples suivants montrent comment lancer le shell interactif Spark, utiliser Spark submit ou utiliser Amazon EMR Notebooks pour travailler avec Hudi sur Amazon. EMR Vous pouvez également utiliser l' DeltaStreamer utilitaire Hudi ou d'autres outils pour écrire dans un ensemble de données. Tout au long de cette section, les exemples montrent comment utiliser des ensembles de données à l'aide du shell Spark lorsque vous êtes connecté au nœud principal en SSH tant qu'hadoop
utilisateur par défaut.
Lorsque vous exécutez spark-shell
ou spark-sql
utilisez Amazon EMR 6.7.0 ou version ultérieure, passez les commandes suivantes. spark-submit
Note
Amazon EMR 6.7.0 utilise Apache Hudi
Pour ouvrir le shell Spark sur le nœud primaire
-
Connectez-vous au nœud principal à l'aide deSSH. Pour plus d'informations, consultez Se connecter au nœud principal SSH à l'aide de l'Amazon EMR Management Guide.
-
Entrez la commande suivante pour lancer le shell Spark. Pour utiliser la PySpark coque, remplacez-la
spark-shell
parpyspark
.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Lorsque vous exécutez spark-shell
ou spark-sql
utilisez Amazon EMR 6.6.x ou une version antérieure, passez les commandes suivantes. spark-submit
Note
-
Amazon EMR 6.2, 5.31 et versions ultérieures (Hudi 0.6.x et versions ultérieures) peuvent les omettre de la configuration.
spark-avro.jar
-
Amazon EMR 6.5, 5.35 et versions ultérieures (Hudi 0.9.x et versions ultérieures) peuvent être omis
spark.sql.hive.convertMetastoreParquet=false
de la configuration. -
Amazon EMR 6.6, 5.36 et versions ultérieures (Hudi 0.10.x et versions ultérieures) doivent inclure la
HoodieSparkSessionExtension
configuration telle que décrite dans le guide Spark de la version : 0.10.0 :--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Pour ouvrir le shell Spark sur le nœud primaire
-
Connectez-vous au nœud principal à l'aide deSSH. Pour plus d'informations, consultez Se connecter au nœud principal SSH à l'aide de l'Amazon EMR Management Guide.
-
Entrez la commande suivante pour lancer le shell Spark. Pour utiliser la PySpark coque, remplacez-la
spark-shell
parpyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Pour utiliser Hudi avec Amazon EMR Notebooks, vous devez d'abord copier les fichiers JAR Hudi du système de fichiers local vers HDFS le nœud principal du cluster de blocs-notes. Vous utilisez ensuite l'éditeur de bloc-notes pour configurer votre EMR bloc-notes afin qu'il utilise Hudi.
Pour utiliser Hudi avec Amazon Notebooks EMR
-
Créez et lancez un cluster pour Amazon EMR Notebooks. Pour plus d'informations, consultez la section Création de EMR clusters Amazon pour ordinateurs portables dans le guide EMR de gestion Amazon.
-
Connectez-vous au nœud principal du cluster à l'aide SSH des fichiers jar du système de fichiers local, puis copiez-les HDFS comme indiqué dans les exemples suivants. Dans l'exemple, nous créons un répertoire dans HDFS pour faciliter la gestion des fichiers. Vous pouvez choisir votre propre destinationHDFS, si vous le souhaitez.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
Ouvrez l'éditeur de bloc-notes, entrez le code de l'exemple suivant et exécutez-le.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Pour utiliser Hudi avec Amazon EMR Notebooks, vous devez d'abord copier les fichiers JAR Hudi du système de fichiers local vers HDFS le nœud principal du cluster de blocs-notes. Vous utilisez ensuite l'éditeur de bloc-notes pour configurer votre EMR bloc-notes afin qu'il utilise Hudi.
Pour utiliser Hudi avec Amazon Notebooks EMR
-
Créez et lancez un cluster pour Amazon EMR Notebooks. Pour plus d'informations, consultez la section Création de EMR clusters Amazon pour ordinateurs portables dans le guide EMR de gestion Amazon.
-
Connectez-vous au nœud principal du cluster à l'aide SSH des fichiers jar du système de fichiers local, puis copiez-les HDFS comme indiqué dans les exemples suivants. Dans l'exemple, nous créons un répertoire dans HDFS pour faciliter la gestion des fichiers. Vous pouvez choisir votre propre destinationHDFS, si vous le souhaitez.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
Ouvrez l'éditeur de bloc-notes, entrez le code de l'exemple suivant et exécutez-le.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Initialisation d'une session Spark pour Hudi
Lorsque vous utilisez Scala, vous devez importer les classes suivantes dans votre session Spark. Vous devez le faire une fois par session Spark.
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
Écrire dans un jeu de données Hudi
Les exemples suivants montrent comment créer un jeu de données Hudi DataFrame et l'écrire sous forme de jeu de données Hudi.
Note
Pour coller des exemples de code dans le shell Spark, tapez :paste
à l'invite, collez l'exemple, puis appuyez sur CTRL
+ D
.
Chaque fois que vous écrivez un dans DataFrame un jeu de données Hudi, vous devez spécifierDataSourceWriteOptions
. Beaucoup de ces options sont susceptibles d'être identiques dans les opérations d'écriture. L'exemple suivant spécifie les options communes à l'aide de la variable
, que les exemples suivants utilisent.hudiOptions
Note
Amazon EMR 6.7.0 utilise Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Note
Vous pouvez voir « hoodie » au lieu de Hudi dans les exemples de code et les notifications. La base de code Hudi utilise largement l'ancienne orthographe « hoodie ».
Option | Description |
---|---|
TABLE_NAME |
Nom de la table sous laquelle enregistrer l'ensemble de données. |
TABLE_TYPE_OPT_KEY |
Facultatif. Spécifie si l'ensemble de données est créé en tant que |
RECORDKEY_FIELD_OPT_KEY |
Champ de clé d'enregistrement dont la valeur sera utilisée en tant que composant |
PARTITIONPATH_FIELD_OPT_KEY |
Champ de chemin de partition dont la valeur sera utilisée en tant que composant |
PRECOMBINE_FIELD_OPT_KEY |
Champ utilisé dans la pré-combinaison avant l'écriture réelle. Lorsque deux enregistrements ont la même valeur de clé, Hudi sélectionne celui avec la plus grande valeur pour le champ de pré-combinaison, comme déterminé par |
Les options suivantes sont nécessaires uniquement pour enregistrer la table du jeu de données Hudi dans votre metastore. Si vous n'enregistrez pas votre jeu de données Hudi en tant que table dans le metastore Hive, ces options ne sont pas requises.
Option | Description |
---|---|
HIVE_DATABASE_OPT_KEY |
Base de données Hive avec laquelle synchroniser. L’argument par défaut est |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
Classe utilisée pour extraire les valeurs de champ de partition dans les colonnes de partition Hive. |
HIVE_PARTITION_FIELDS_OPT_KEY |
Champ dans l'ensemble de données à utiliser pour déterminer les colonnes de partition Hive. |
HIVE_SYNC_ENABLED_OPT_KEY |
Lorsqu'elle est définie sur |
HIVE_TABLE_OPT_KEY |
Obligatoire. Nom de la table dans Hive avec laquelle synchroniser. Par exemple, |
HIVE_USER_OPT_KEY |
Facultatif. Nom d'utilisateur Hive à utiliser lors de la synchronisation. Par exemple, |
HIVE_PASS_OPT_KEY |
Facultatif. Mot de passe Hive pour l'utilisateur spécifié par |
HIVE_URL_OPT_KEY |
Le URL métastore Hive. |
Données d'Upsert
L'exemple suivant montre comment insérer des données en écrivant un DataFrame. Contrairement à l'exemple d'insertion précédent, la valeur OPERATION_OPT_KEY
est définie sur UPSERT_OPERATION_OPT_VAL
. En outre, .mode(SaveMode.Append)
est spécifié pour indiquer que l'enregistrement doit être ajouté.
Note
Amazon EMR 6.7.0 utilise Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Supprimer un enregistrement
Pour supprimer définitivement un enregistrement, vous pouvez insérer une charge utile vide. Dans ce cas, l'option PAYLOAD_CLASS_OPT_KEY
spécifie la classe EmptyHoodieRecordPayload
. L'exemple utilise le même DataFrameupdateDF
, utilisé dans l'exemple upsert pour spécifier le même enregistrement.
Note
Amazon EMR 6.7.0 utilise Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
Vous pouvez également supprimer définitivement des données en définissant OPERATION_OPT_KEY
sur DELETE_OPERATION_OPT_VAL
pour supprimer tous les enregistrements de le jeu de données que vous soumettez. Pour obtenir des instructions sur les suppressions logicielles et pour plus d'informations sur la suppression de données stockées dans des tables Hudi, consultez Suppressions
Lire à partir d'un jeu de données Hudi
Pour récupérer les données à l'heure actuelle, Hudi effectue des requêtes instantanées par défaut. Voici un exemple d'interrogation de le jeu de données écrit dans S3 dans Écrire dans un jeu de données Hudi. s3://amzn-s3-demo-bucket/myhudidataset
Remplacez-le par le chemin de votre table et ajoutez des astérisques génériques pour chaque niveau de partition, ainsi qu'un astérisque supplémentaire. Dans cet exemple, il existe un niveau de partition. Nous avons donc ajouté deux symboles génériques.
Note
Amazon EMR 6.7.0 utilise Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
Requêtes incrémentielles
Vous pouvez également effectuer des requêtes incrémentielles avec Hudi pour obtenir un flux d'enregistrements modifiés depuis un horodatage de validation donné. Pour ce faire, définissez le champ QUERY_TYPE_OPT_KEY
sur QUERY_TYPE_INCREMENTAL_OPT_VAL
. Ajoutez ensuite une valeur pour BEGIN_INSTANTTIME_OPT_KEY
pour obtenir tous les enregistrements écrits depuis l'heure spécifiée. Les requêtes incrémentielles sont généralement dix fois plus efficaces que leurs homologues par lots, car elles ne traitent que les enregistrements modifiés.
Lorsque vous effectuez des requêtes incrémentielles, utilisez le chemin de la table racine (de base) sans les astérisques génériques utilisés pour les requêtes Snapshot.
Note
Presto ne prend pas en charge les requêtes incrémentielles.
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
Pour plus d'informations sur la lecture d'jeux de données Hudi, consultez la rubrique Interrogation de tables Hudi