Usar a estrutura Hudi no AWS Glue - AWS Glue

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usar a estrutura Hudi no AWS Glue

O AWS Glue 3.0 e versões posteriores são compatíveis com a estrutura Apache Hudi para data lakes. Hudi é uma estrutura de armazenamento de data lake de código aberto que simplifica o processamento incremental de dados e o desenvolvimento de pipelines de dados. Este tópico aborda os recursos disponíveis para usar dados no AWS Glue ao transportar ou armazenar dados em uma tabela do Hudi. Para saber mais sobre o Hudi, consulte a documentação do Apache Hudi.

Você pode usar o AWS Glue para realizar operações de leitura e gravação em tabelas do Hudi no Amazon S3 ou trabalhar com tabelas do Hudi usando o AWS Glue Data Catalog. Operações adicionais, incluindo inserção, atualização e todas as operações do Apache Spark também são compatíveis.

nota

O Apache Hudi 0.10.1 para AWS Glue 3.0 não é compatível com tabelas Merge on Read (MoR) do Hudi.

A tabela a seguir lista a versão do Hudi incluída em cada versão do AWS Glue.

Versão do AWS Glue Versões compatíveis do Hudi
4,0 0.12.1
3.0 0.10.1

Para saber mais sobre as estruturas de data lake compatíveis com o AWS Glue, consulte Usar estruturas de data lake com trabalhos do AWS Glue ETL.

Habilitar o Hudi

Para habilitar o Hudi para AWS Glue, faça as seguintes tarefas:

  • Especifique hudi como um valor para o parâmetro de trabalho --datalake-formats. Para ter mais informações, consulte Parâmetros de trabalho do AWS Glue.

  • Crie uma chave denominada --conf para o trabalho do AWS Glue e defina-a com o seguinte valor. Ou então, você pode definir a configuração a seguir usando SparkConf no script. Essas configurações ajudam o Apache Spark a lidar corretamente com as tabelas do Hudi.

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • O suporte à permissão do Lake Formation para tabelas do Hudi está habilitado por padrão para o AWS Glue 4.0. Nenhuma configuração adicional é necessária para leitura/escrita em tabelas do Hudi registradas no Lake Formation. Para ler uma tabela do Hudi registrada, o perfil do IAM do trabalho do AWS Glue deve ter a permissão SELECT. Para escrever em uma tabela do Hudi registrada, o perfil do IAM do trabalho do AWS Glue deve ter a permissão SUPER. Para saber mais sobre como gerenciar as permissões do Lake Formation, consulte Conceder e revogar permissões nos recursos do Catálogo de Dados.

Usar uma versão diferente do Hudi

Para usar uma versão do Hudi que não é compatível com o AWS Glue, especifique seus próprios arquivos JAR do Hudi usando o parâmetro de trabalho --extra-jars. Não inclua hudi como um valor para o parâmetro de trabalho --datalake-formats.

Exemplo: escrever uma tabela Hudi no Amazon S3 e registrá-la no AWS Glue Data Catalog

O exemplo a seguir demonstra como gravar uma tabela do Hudi no Amazon S3 e registrá-la no AWS Glue Data Catalog. O exemplo usa a ferramenta Hive Sync do Hudi para registrar a tabela.

nota

Este exemplo exige que você defina o parâmetro de trabalho --enable-glue-datacatalog para usar o AWS Glue Data Catalog como metastore do Apache Spark Hive. Para saber mais, consulte Parâmetros de trabalho do AWS Glue.

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()

Exemplo: ler uma tabela do Hudi do Amazon S3 usando o AWS Glue Data Catalog

Este exemplo lê a tabela Hudi que você criou no Exemplo: escrever uma tabela Hudi no Amazon S3 e registrá-la no AWS Glue Data Catalog do Amazon S3.

nota

Este exemplo exige que você defina o parâmetro de trabalho --enable-glue-datacatalog para usar o AWS Glue Data Catalog como metastore do Apache Spark Hive. Para saber mais, consulte Parâmetros de trabalho do AWS Glue.

Python

Para este exemplo, use o método GlueContext.create_data_frame.from_catalog().

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

Neste exemplo, use o método getCatalogSource.

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

Exemplo: atualizar e inserir um DataFrame a em uma tabela do Hudi no Amazon S3

Este exemplo usa o AWS Glue Data Catalog para inserir um DataFrame na tabela do Hudi que você criou em Exemplo: escrever uma tabela Hudi no Amazon S3 e registrá-la no AWS Glue Data Catalog.

nota

Este exemplo exige que você defina o parâmetro de trabalho --enable-glue-datacatalog para usar o AWS Glue Data Catalog como metastore do Apache Spark Hive. Para saber mais, consulte Parâmetros de trabalho do AWS Glue.

Python

Para este exemplo, use o método GlueContext.write_data_frame.from_catalog().

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

Neste exemplo, use o método getCatalogSink.

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

Exemplo: ler uma tabela do Hudi do Amazon S3 usando o Spark

Este exemplo lê uma tabela do Hudi do Amazon S3 usando a API do Spark.

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

Exemplo: gravar uma tabela no Amazon S3 usando o Spark

Este exemplo grava uma tabela no Amazon S3 usando o Spark.

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")

Exemplo: ler e escrever na tabela do Hudi com o controle de permissão do Lake Formation

Este exemplo lê e escreve uma tabela do Hudi com o controle de permissão do Lake Formation

  1. Criar uma tabela do Hudi e registrá-la no Lake Formation

    1. Para habilitar o controle de permissão do Lake Formation, primeiro será necessário registrar o caminho da tabela do Amazon S3 no Lake Formation. Para obter mais informações, consulte Registering an Amazon S3 location (Registrar um local do Amazon S3). Você pode registrá-lo no console do Lake Formation ou usando a AWS CLI:

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      Depois de registrar uma localização no Amazon S3, qualquer tabela do AWS Glue apontando para a localização (ou qualquer uma de suas localizações secundárias) retornará o valor do parâmetro IsRegisteredWithLakeFormation como verdadeiro na chamada GetTable.

    2. Crie uma tabela do Hudi que aponte para o caminho registrado do Amazon S3 por meio da API de dataframe do Spark:

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. Conceda permissão do Lake Formation para o perfil do IAM do trabalho do AWS Glue. Você pode conceder permissões no console do Lake Formation ou usando a AWS CLI. Para obter mais informações, consulte Conceder permissões de banco de dados usando o console do Lake Formation e o método de recurso nomeado

  3. Leia a tabela do Hudi registrada no Lake Formation. O código é o mesmo que o da leitura de uma tabela do Hudi não registrada. Observe que o perfil do IAM do trabalho do AWS Glue precisa ter a permissão SELECT para que a leitura seja bem-sucedida.

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. Escreva em uma tabela do Hudi registrada no Lake Formation. O código é o mesmo que o da escrita em uma tabela do Hudi não registrada. Observe que o perfil do IAM do trabalho do AWS Glue precisa ter a permissão SUPER para que a escrita seja bem-sucedida.

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)