Apache Hudi and Lake Formation with Amazon EMR - Amazon EMR

Apache Hudi and Lake Formation with Amazon EMR

Amazon EMR releases 6.15.0 and higher include support for fine-grained access control based on AWS Lake Formation with Apache Hudi when you read and write data with Spark SQL. Amazon EMR supports table, row, column, and cell-level access control with Apache Hudi. With this feature, you can run snapshot queries on copy-on-write tables to query the latest snapshot of the table at a given commit or compaction instant.

Currently, a Lake Formation-enabled Amazon EMR cluster must retrieve Hudi's commit time column to perform incremental queries and time travel queries. It doesn't support Spark's timestamp as of syntax and the Spark.read() function. The correct syntax is select * from table where _hoodie_commit_time <= point_in_time. For more information, see Point in time Time-Travel queries on Hudi table.

The following support matrix lists some core features of Apache Hudi with Lake Formation:

Copy on Write Merge on Read

Snapshot queries - Spark SQL

Read-optimized queries - Spark SQL

Incremental queries

Time travel queries

Metadata tables

DML INSERT commands

DDL commands

Spark datasource queries

Spark datasource writes

Querying Hudi tables

This section shows how you can run the supported queries described above on a Lake Formation enabled cluster. The table should be a registered catalog table.

  1. To start the Spark shell, use the following commands.

    spark-sql --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \ --conf spark.sql.catalog.spark_catalog.lf.managed=true

    If you want Lake Formation to use record server to manage your Spark catalog, set spark.sql.catalog.<managed_catalog_name>.lf.managed to true.

  2. To query the latest snapshot of copy-on-write tables, use the following commands.

    SELECT * FROM my_hudi_cow_table
    spark.read.table("my_hudi_cow_table")
  3. To query the latest compacted data of MOR tables, you can query the read-optimized table that is suffixed with _ro:

    SELECT * FROM my_hudi_mor_table_ro
    spark.read.table("my_hudi_mor_table_ro")
Note

The performance of reads on Lake Formation clusters might be slower because of optimizations that are not supported. These features include file listing based on Hudi metadata, and data skipping. We recommend that you test your application performance to ensure that it meets your requirements.