Trabalhe com um conjunto de dados do Hudi - Amazon EMR

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Trabalhe com um conjunto de dados do Hudi

O Hudi oferece suporte à inserção, atualização e exclusão de dados em conjuntos de dados do Hudi por meio do Spark. Para obter mais informações, consulte Gravar tabelas do Hudi na documentação do Apache Hudi.

Os exemplos a seguir demonstram como iniciar o shell interativo do Spark, usar o envio do Spark ou usar o Amazon EMR Notebooks para trabalhar com o Hudi na Amazon. EMR Você também pode usar o DeltaStreamer utilitário Hudi ou outras ferramentas para gravar em um conjunto de dados. Nesta seção, os exemplos demonstram como trabalhar com conjuntos de dados usando o shell do Spark enquanto estão conectados ao nó principal usando SSH como usuário padrãohadoop.

Ao executar spark-shell ou spark-sql usar o Amazon EMR 6.7.0 ou posterior, passe os seguintes comandos. spark-submit

nota

A Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

Abrir o shell do Spark no nó primário
  1. Conecte-se ao nó primário usando SSH o. Para obter mais informações, consulte Conecte-se ao nó primário usando SSH o Amazon EMR Management Guide.

  2. Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha, spark-shell substitua porpyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Ao executar spark-shell ou spark-sql usar o Amazon EMR 6.6.x ou anterior, passe os seguintes comandos. spark-submit

nota
Abrir o shell do Spark no nó primário
  1. Conecte-se ao nó primário usando SSH o. Para obter mais informações, consulte Conecte-se ao nó primário usando SSH o Amazon EMR Management Guide.

  2. Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha, spark-shell substitua porpyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Para usar o Hudi com o Amazon EMR Notebooks, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para HDFS o nó principal do cluster de notebooks. Em seguida, você usa o editor do notebook para configurar seu EMR notebook para usar o Hudi.

Para usar o Hudi com Amazon Notebooks EMR
  1. Crie e lance um cluster para Amazon EMR Notebooks. Para obter mais informações, consulte Criação de EMR clusters da Amazon para notebooks no Amazon EMR Management Guide.

  2. Conecte-se ao nó principal do cluster usando SSH e, em seguida, copie os arquivos jar do sistema de arquivos local para, HDFS conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório HDFS para facilitar o gerenciamento de arquivos. Você pode escolher seu próprio destino emHDFS, se desejar.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Para usar o Hudi com o Amazon EMR Notebooks, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para HDFS o nó principal do cluster de notebooks. Em seguida, você usa o editor do notebook para configurar seu EMR notebook para usar o Hudi.

Para usar o Hudi com Amazon Notebooks EMR
  1. Crie e lance um cluster para Amazon EMR Notebooks. Para obter mais informações, consulte Criação de EMR clusters da Amazon para notebooks no Amazon EMR Management Guide.

  2. Conecte-se ao nó principal do cluster usando SSH e, em seguida, copie os arquivos jar do sistema de arquivos local para, HDFS conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório HDFS para facilitar o gerenciamento de arquivos. Você pode escolher seu próprio destino emHDFS, se desejar.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Inicializar uma sessão do Spark para Hudi

Ao usar o Scala, você deve importar as seguintes classes na sessão do Spark. Isso precisa ser feito uma vez por sessão do Spark.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Gravar em um conjunto de dados do Hudi

Os exemplos a seguir mostram como criar um DataFrame e escrevê-lo como um conjunto de dados Hudi.

nota

Para colar exemplos de código no shell do Spark, digite :paste no prompt, cole o exemplo e pressione CTRL + D.

Cada vez que você grava um DataFrame em um conjunto de dados Hudi, você deve especificar. DataSourceWriteOptions Muitas dessas opções provavelmente serão idênticas entre as operações de gravação. O exemplo a seguir especifica opções comuns usando a variável hudiOptions, usada pelos exemplos subsequentes.

nota

A Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')
nota

Você pode ver “hoodie” em vez de Hudi em exemplos de código e notificações. A base de código do Hudi usa amplamente a antiga grafia “hoodie”.

DataSourceWriteOptions referência para Hudi
Opção Descrição

TABLE_NAME

O nome da tabela com o qual registrar o conjunto de dados.

TABLE_TYPE_OPT_KEY

Opcional. Especifica se o conjunto de dados foi criado como "COPY_ON_WRITE" ou "MERGE_ON_READ". O padrão é "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

O campo de chave de registro cujo valor será usado como o componente recordKey de HoodieKey. O valor real será obtido invocando .toString() no valor do campo. Campos aninhados podem ser especificados usando a notação de pontos, por exemplo, a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

O campo de caminho de partição cujo valor será usado como o componente partitionPath de HoodieKey. O valor real será obtido invocando .toString() no valor do campo.

PRECOMBINE_FIELD_OPT_KEY

O campo usado na pré-combinação antes da gravação real. Quando dois registros têm o mesmo valor de chave, o Hudi seleciona aquele com o maior valor para o campo de pré-combinação, conforme determinado por Object.compareTo(..).

As opções a seguir são necessárias apenas para registrar a tabela do conjunto de dados do Hudi no seu metastore. Se você não registrar o conjunto de dados do Hudi como uma tabela no metastore do Hive, essas opções não serão necessárias.

DataSourceWriteOptions referência para Hive
Opção Descrição

HIVE_DATABASE_OPT_KEY

O banco de dados do Hive com o qual sincronizar. O padrão é "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

A classe usada para extrair valores de campo de partição em colunas de partição do Hive.

HIVE_PARTITION_FIELDS_OPT_KEY

O campo no conjunto de dados a ser usado para determinar colunas de partição do Hive.

HIVE_SYNC_ENABLED_OPT_KEY

Quando definido como "true", registra o conjunto de dados no metastore do Apache Hive. O padrão é "false".

HIVE_TABLE_OPT_KEY

Obrigatório. O nome da tabela no Hive com a qual sincronizar. Por exemplo, "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Opcional. O nome de usuário do Hive a ser usado ao sincronizar. Por exemplo, "hadoop".

HIVE_PASS_OPT_KEY

Opcional. A senha do Hive para o usuário especificado por HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

A URL metastore Hive.

Upsert dados

O exemplo a seguir demonstra como alterar dados escrevendo um. DataFrame Ao contrário do exemplo anterior de inserção, o valor OPERATION_OPT_KEY é definido como UPSERT_OPERATION_OPT_VAL. Além disso, .mode(SaveMode.Append) é especificado para indicar que o registro deve ser anexado.

nota

A Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Excluir um registro

Para excluir um registro de forma irreversível, você pode upsert uma carga útil vazia. Nesse caso, a opção PAYLOAD_CLASS_OPT_KEY especifica a classe EmptyHoodieRecordPayload. O exemplo usa o mesmo DataFrame,updateDF, usado no exemplo upsert para especificar o mesmo registro.

nota

A Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Você também pode excluir dados de forma irreversível definindo OPERATION_OPT_KEY como DELETE_OPERATION_OPT_VAL para remover todos os registros no conjunto de dados enviado. Para obter instruções sobre como realizar exclusões reversíveis e obter mais informações sobre a exclusão de dados armazenados em tabelas do Hudi, consulte Exclusões na documentação do Apache Hudi.

Ler em um conjunto de dados do Hudi

Para recuperar dados no momento atual, o Hudi realiza consultas de snapshots por padrão. Veja a seguir um exemplo de consulta do conjunto de dados gravado no S3 em Gravar em um conjunto de dados do Hudi. s3://amzn-s3-demo-bucket/myhudidatasetSubstitua pelo caminho da tabela e adicione asteriscos curinga para cada nível de partição, além de um asterisco adicional. Neste exemplo, há um nível de partição, portanto adicionamos dois símbolos curinga.

nota

A Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

Consultas incrementais

Você também pode realizar consultas incrementais com o Hudi para obter um fluxo de registros que foram alterados desde um determinado carimbo de data/hora de confirmação. Para fazer isso, defina o campo QUERY_TYPE_OPT_KEY como QUERY_TYPE_INCREMENTAL_OPT_VAL. Em seguida, adicione um valor para BEGIN_INSTANTTIME_OPT_KEY para obter todos os registros gravados desde a hora especificada. Normalmente, as consultas incrementais são dez vezes mais eficientes do que as de lote, pois processam somente registros alterados.

Ao realizar consultas incrementais, use o caminho da tabela raiz (básica) sem os asteriscos curinga usados nas consultas Snapshot.

nota

O Presto não é compatível com consultas incrementais.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

Para obter mais informações sobre a leitura de conjuntos de dados do Hudi, consulte Consultar tabelas do Hudi na documentação do Apache Hudi.