Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Uso de un conjunto de datos de Hudi
Hudi permite insertar, actualizar y eliminar datos de conjuntos de datos de Hudi a través de Spark. Para obtener más información, consulte Escritura de tablas de Hudi
En los siguientes ejemplos, se muestra cómo iniciar el intérprete de comandos interactivo de Spark, utilizar el envío de Spark o usar Cuadernos de Amazon EMR para trabajar con Hudi en Amazon EMR. También puedes usar la DeltaStreamer utilidad Hudi u otras herramientas para escribir en un conjunto de datos. En esta sección, los ejemplos muestran cómo trabajar con datasets usando el shell de Spark mientras está conectado al nodo principal usando SSH como usuario predeterminado de hadoop
.
Cuando ejecute spark-shell
, spark-submit
o spark-sql
con Amazon EMR 6.7.0 o una versión posterior, ejecute los siguientes comandos.
nota
Amazon EMR 6.7.0 usa Apache Hudi
Para abrir el intérprete de comandos de Spark en el nodo principal
-
Conéctese al nodo principal mediante SSH. Para obtener más información, consulte Conectarse al nodo principal mediante SSH en la Guía de administración de Amazon EMR.
-
Introduzca el siguiente comando para iniciar el shell de Spark. Para usar la PySpark carcasa, sustitúyala por
spark-shell
.pyspark
spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Cuando ejecute spark-shell
, spark-submit
o spark-sql
con Amazon EMR 6.6.x o una versión posterior, ejecute los siguientes comandos.
nota
-
Amazon EMR 6.2 y 5.31 y versiones posteriores (Hudi 0.6.x y versiones posteriores) pueden omitir
spark-avro.jar
en la configuración. -
Amazon EMR 6.5 y 5.35 y versiones posteriores (Hudi 0.9.x y versiones posteriores) pueden omitir
spark.sql.hive.convertMetastoreParquet=false
en la configuración. -
Amazon EMR 6.6 y 5.36 y versiones posteriores (Hudi 0.10.x y versiones posteriores) deben incluir la configuración de
HoodieSparkSessionExtension
descrita en la Guía de la versión 0.10.0 de Spark: --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Para abrir el intérprete de comandos de Spark en el nodo principal
-
Conéctese al nodo principal mediante SSH. Para obtener más información, consulte Conectarse al nodo principal mediante SSH en la Guía de administración de Amazon EMR.
-
Introduzca el siguiente comando para iniciar el shell de Spark. Para usar la PySpark carcasa,
spark-shell
sustitúyala porpyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Para utilizar Hudi con Cuadernos de Amazon EMR, antes debe copiar los archivos jar de Hudi del sistema de archivos local a HDFS en el nodo maestro del clúster de bloc de notas. A continuación, puede utilizar el editor de cuadernos para configurar el cuaderno de EMR de modo que utilice Hudi.
Para usar Hudi con Cuadernos de Amazon EMR
-
Cree y lance un clúster para Cuadernos de Amazon EMR. Para más información, consulte Creación de clústeres de Amazon EMR para cuadernos en la Guía de administración de Amazon EMR.
-
Conéctese al nodo principal del clúster mediante SSH y, a continuación, copie los archivos jar desde el sistema de archivos local a HDFS, como se muestra en los ejemplos siguientes. En el ejemplo, creamos un directorio en HDFS para que la administración de archivos resulte más clara. Puede elegir su propio destino en HDFS, si lo desea.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
Abra el editor de bloc de notas, escriba el código del siguiente ejemplo y ejecútelo.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Para utilizar Hudi con Cuadernos de Amazon EMR, antes debe copiar los archivos jar de Hudi del sistema de archivos local a HDFS en el nodo maestro del clúster de bloc de notas. A continuación, puede utilizar el editor de cuadernos para configurar el cuaderno de EMR de modo que utilice Hudi.
Para usar Hudi con Cuadernos de Amazon EMR
-
Cree y lance un clúster para Cuadernos de Amazon EMR. Para más información, consulte Creación de clústeres de Amazon EMR para cuadernos en la Guía de administración de Amazon EMR.
-
Conéctese al nodo principal del clúster mediante SSH y, a continuación, copie los archivos jar desde el sistema de archivos local a HDFS, como se muestra en los ejemplos siguientes. En el ejemplo, creamos un directorio en HDFS para que la administración de archivos resulte más clara. Puede elegir su propio destino en HDFS, si lo desea.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
Abra el editor de bloc de notas, escriba el código del siguiente ejemplo y ejecútelo.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Iniciar una sesión de Spark para Hudi
Al usar Scala, debe importar las siguientes clases a su sesión de Spark. Esto debe hacerse una vez por cada sesión de Spark.
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
Escribir en un conjunto de datos de Hudi
Los siguientes ejemplos muestran cómo crear un conjunto de datos de Hudi DataFrame y cómo escribirlo como tal.
nota
Para pegar muestras de código en el shell de Spark, escriba :paste
en el símbolo del sistema, pegue el ejemplo y, a continuación, pulse CTRL
+ D
.
Cada vez que escribas DataFrame a en un conjunto de datos de Hudi, debes especificarlo. DataSourceWriteOptions
Es probable que muchas de estas opciones sean idénticas en varias operaciones de escritura. En el ejemplo siguiente se especifican las opciones comunes mediante la variable
, que se usa en los ejemplos posteriores.hudiOptions
nota
Amazon EMR 6.7.0 usa Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
nota
Es posible que vea “hoodie” en lugar de Hudi en los ejemplos de código y las notificaciones. El código base de Hudi utiliza ampliamente la antigua palabra “hoodie”.
Opción | Descripción |
---|---|
TABLE_NAME |
Nombre de la tabla bajo la que se registra el dataset. |
TABLE_TYPE_OPT_KEY |
Opcional. Especifica si el dataset que se va a crear será de tipo |
RECORDKEY_FIELD_OPT_KEY |
Campo de clave de registro cuyo valor se utilizará como componente |
PARTITIONPATH_FIELD_OPT_KEY |
Campo de la ruta de partición cuyo valor se utilizará como componente |
PRECOMBINE_FIELD_OPT_KEY |
Campo utilizado en la combinación previa antes de la escritura real. Cuando dos registros tienen el mismo valor de clave, Hudi selecciona el que tiene el valor mayor para el campo de combinación previa, según lo determinado por |
Las siguientes opciones solo son necesarias para registrar la tabla del conjunto de datos de Hudi en su metaalmacén. Si no registra el conjunto de datos de Hudi como tabla en el metaalmacén de Hudi, estas opciones no son necesarias.
Opción | Descripción |
---|---|
HIVE_DATABASE_OPT_KEY |
Base de datos Hive con la que se realizará la sincronización. El valor predeterminado es |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
Clase utilizada para extraer valores de los campos de partición a las columnas de partición de Hive. |
HIVE_PARTITION_FIELDS_OPT_KEY |
Campo del dataset que se utilizará para determinar las columnas de partición de Hive. |
HIVE_SYNC_ENABLED_OPT_KEY |
Cuando se establece en |
HIVE_TABLE_OPT_KEY |
Obligatorio. Nombre de la tabla de Hive con la que se va a realizar la sincronización. Por ejemplo, |
HIVE_USER_OPT_KEY |
Opcional. Nombre de usuario de Hive que se va a utilizar al sincronizar. Por ejemplo, |
HIVE_PASS_OPT_KEY |
Opcional. Contraseña de Hive para el usuario especificado por |
HIVE_URL_OPT_KEY |
Dirección URL del metaalmacén de Hive. |
Datos de upsert
El siguiente ejemplo muestra cómo alterar los datos escribiendo un. DataFrame A diferencia del ejemplo de inserción anterior, el valor de OPERATION_OPT_KEY
se establece en UPSERT_OPERATION_OPT_VAL
. Además, se especifica .mode(SaveMode.Append)
para indicar que el registro se debe anexar.
nota
Amazon EMR 6.7.0 usa Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Eliminación de un registro
Para eliminar un registro de forma permanente, puede alterar una carga útil vacía. En este caso, la opción PAYLOAD_CLASS_OPT_KEY
especifica la clase EmptyHoodieRecordPayload
. En el ejemplo se utiliza lo mismo DataFrame que en el ejemplo de upsert para especificar el mismo registro. updateDF
nota
Amazon EMR 6.7.0 usa Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
También puede eliminar datos de forma permanente si configura OPERATION_OPT_KEY
como DELETE_OPERATION_OPT_VAL
para eliminar todos los registros del conjunto de datos que envíe. Para obtener instrucciones sobre cómo llevar a cabo eliminaciones temporales y para obtener más información sobre cómo eliminar los datos almacenados en las tablas de Hudi, consulte Eliminaciones
Leer un conjunto de datos de Hudi
Para recuperar los datos en el momento actual, Hudi lleva a cabo consultas instantáneas de forma predeterminada. El siguiente es un ejemplo de consulta del conjunto de datos escrito en S3 en Escribir en un conjunto de datos de Hudi. s3://amzn-s3-demo-bucket/myhudidataset
Sustitúyala por la ruta de la tabla y añada asteriscos comodín para cada nivel de partición, además de un asterisco adicional. En este ejemplo, hay un nivel de partición, por lo que hemos agregado dos símbolos comodín.
nota
Amazon EMR 6.7.0 usa Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
Consultas incrementales
También puede hacer consultas incrementales con Hudi para obtener un flujo de registros que han cambiado desde una fecha de confirmación determinada. Para ello, defina el campo QUERY_TYPE_OPT_KEY
en QUERY_TYPE_INCREMENTAL_OPT_VAL
. A continuación, agregue un valor para BEGIN_INSTANTTIME_OPT_KEY
para obtener todos los registros escritos desde el momento especificado. Las consultas incrementales suelen ser diez veces más eficientes que las consultas por lotes, ya que solo procesan los registros modificados.
Al hacer consultas incrementales, utilice la ruta de la tabla raíz (base) sin los asteriscos comodín que se utilizan en las consultas instantáneas.
nota
Presto no admite consultas incrementales.
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
Para obtener más información acerca de la lectura de los conjuntos de datos de Hudi, consulte la sección sobre Cómo consultar tablas de Hudi