Using Athena to query Apache Hudi datasets
Apache HudiUpsert
refers to the ability to insert records into an
existing dataset if they do not already exist or to update them if they do.
Hudi handles data insertion and update events without creating many small files that can cause performance issues for analytics. Apache Hudi automatically tracks changes and merges files so that they remain optimally sized. This avoids the need to build custom solutions that monitor and re-write many small files into fewer large files.
Hudi datasets are suitable for the following use cases:
-
Complying with privacy regulations like General data protection regulation
(GDPR) and California consumer privacy act (CCPA) that enforce people's right to remove personal information or change how their data is used. -
Working with streaming data from sensors and other Internet of Things (IoT) devices that require specific data insertion and update events.
-
Implementing a change data capture (CDC) system
.
Data sets managed by Hudi are stored in Amazon S3 using open storage formats. Currently, Athena
can read compacted Hudi datasets but not write Hudi data. Athena supports up to Hudi version
0.8.0 with Athena engine version 2, and Hudi version 0.14.0 with Athena engine version 3. This is subject to change. Athena
cannot guarantee read compatibility with tables that are created with later versions of
Hudi. For information about Athena engine versioning, see Athena engine versioning. For more information about Hudi features and
versioning, see the Hudi
documentation
Hudi dataset table types
A Hudi dataset can be one of the following types:
-
Copy on Write (CoW) – Data is stored in a columnar format (Parquet), and each update creates a new version of files during a write.
-
Merge on Read (MoR) – Data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based
delta
files and are compacted as needed to create new versions of the columnar files.
With CoW datasets, each time there is an update to a record, the file that contains the record is rewritten with the updated values. With a MoR dataset, each time there is an update, Hudi writes only the row for the changed record. MoR is better suited for write- or change-heavy workloads with fewer reads. CoW is better suited for read-heavy workloads on data that change less frequently.
Hudi provides three query types for accessing the data:
-
Snapshot queries – Queries that see the latest snapshot of the table as of a given commit or compaction action. For MoR tables, snapshot queries expose the most recent state of the table by merging the base and delta files of the latest file slice at the time of the query.
-
Incremental queries – Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
-
Read optimized queries – For MoR tables, queries see the latest data compacted. For CoW tables, queries see the latest data committed.
The following table shows the possible Hudi query types for each table type.
Table type | Possible Hudi query types |
---|---|
Copy On Write | snapshot, incremental |
Merge On Read | snapshot, incremental, read optimized |
Currently, Athena supports snapshot queries and read optimized queries, but not incremental queries. On MoR tables, all data exposed to read optimized queries are compacted. This provides good performance but does not include the latest delta commits. Snapshot queries contain the freshest data but incur some computational overhead, which makes these queries less performant.
For more information about the tradeoffs between table and query types, see Table & Query Types
Hudi terminology change: Views are now queries
Starting in release version 0.5.1, Apache Hudi changed some of its terminology. What were formerly views are called queries in later releases. The following table summarizes the changes between the old and new terms.
Old term | New term |
---|---|
CoW: read optimized view MoR: realtime view |
Snapshot queries |
Incremental view | Incremental query |
MoR read optimized view | Read optimized query |
Tables from bootstrap operation
Starting in Apache Hudi version 0.6.0, the bootstrap operation feature provides better performance with existing Parquet datasets. Instead of rewriting the dataset, a bootstrap operation can generate metadata only, leaving the dataset in place.
You can use Athena to query tables from a bootstrap operation just like other
tables based on data in Amazon S3. In your CREATE TABLE
statement, specify
the Hudi table path in your LOCATION
clause.
For more information about creating Hudi tables using the bootstrap operation in
Amazon EMR, see the article New
features from Apache Hudi available in Amazon EMR
Hudi metadata listing
The Apache Hudi has a metadata table
Of these features, Athena currently supports only the file listing index. The file
listing index eliminates file system calls like "list files" by fetching the
information from an index which maintains a partition to files mapping. This removes
the need to recursively list each and every partition under the table path to get a
view of the file system. When you work with large datasets, this indexing
drastically reduces the latency that would otherwise occur when getting the list of
files during writes and queries. It also avoids bottlenecks like request limits
throttling on Amazon S3 LIST
calls.
Note
Athena does not support data skipping or bloom filter indexing at this time.
Enabling the Hudi metadata table
Metadata table based file listing is disabled by default. To enable the Hudi
metadata table and the related file listing functionality, set the
hudi.metadata-listing-enabled
table property to
TRUE
.
Example
The following ALTER TABLE SET TBLPROPERTIES
example enables
the metadata table on the example partition_cow
table.
ALTER TABLE partition_cow SET TBLPROPERTIES('hudi.metadata-listing-enabled'='TRUE')
Considerations and limitations
-
Athena does not support incremental queries.
-
Athena does not support CTAS or INSERT INTO on Hudi data. If you would like Athena support for writing Hudi datasets, send feedback to
<athena-feedback@amazon.com>
.For more information about writing Hudi data, see the following resources:
-
Working with a Hudi dataset in the Amazon EMR Release Guide.
-
Writing Data
in the Apache Hudi documentation.
-
-
Using MSCK REPAIR TABLE on Hudi tables in Athena is not supported. If you need to load a Hudi table not created in AWS Glue, use ALTER TABLE ADD PARTITION.
-
Skipping S3 Glacier objects not supported – If objects in the Apache Hudi table are in an Amazon S3 Glacier storage class, setting the
read_restored_glacier_objects
table property tofalse
has no effect.For example, suppose you issue the following command:
ALTER TABLE
table_name
SET TBLPROPERTIES ('read_restored_glacier_objects' = 'false')For Iceberg and Delta Lake tables, the command produces the error
Unsupported table property key: read_restored_glacier_objects
. For Hudi tables, theALTER TABLE
command does not produce an error, but Amazon S3 Glacier objects are still not skipped. RunningSELECT
queries after theALTER TABLE
command continues to return all objects.
Additional resources
For additional resources on using Apache Hudi with Athena, see the following resources.
Video
The following video shows how you can use Amazon Athena to query a read-optimized Apache Hudi dataset in your Amazon S3-based data lake.
Blog posts
The following AWS Big Data Blog posts include descriptions of how you can use Apache Hudi with Athena.
Creating Hudi tables
This section provides examples of CREATE TABLE statements in Athena for partitioned and nonpartitioned tables of Hudi data.
If you have Hudi tables already created in AWS Glue, you can query them directly in
Athena. When you create partitioned Hudi tables in Athena, you must run ALTER TABLE
ADD PARTITION
to load the Hudi data before you can query it.
Copy on write (CoW) create table examples
Nonpartitioned CoW table
The following example creates a nonpartitioned CoW table in Athena.
CREATE EXTERNAL TABLE `non_partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/non_partition_cow/'
Partitioned CoW table
The following example creates a partitioned CoW table in Athena.
CREATE EXTERNAL TABLE `partition_cow`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/partition_cow/'
The following ALTER TABLE ADD PARTITION
example adds two
partitions to the example partition_cow
table.
ALTER TABLE partition_cow ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/partition_cow/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder
/partition_cow/two/'
Merge on read (MoR) create table examples
Hudi creates two tables in the metastore for MoR: a table for snapshot queries,
and a table for read optimized queries. Both tables are queryable. In Hudi versions
prior to 0.5.1, the table for read optimized queries had the name that you specified
when you created the table. Starting in Hudi version 0.5.1, the table name is
suffixed with _ro
by default. The name of the table for snapshot
queries is the name that you specified appended with _rt
.
Nonpartitioned merge on read (MoR) table
The following example creates a nonpartitioned MoR table in Athena for read
optimized queries. Note that read optimized queries use the input format
HoodieParquetInputFormat
.
CREATE EXTERNAL TABLE `nonpartition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/nonpartition_mor/'
The following example creates a nonpartitioned MoR table in Athena for snapshot
queries. For snapshot queries, use the input format
HoodieParquetRealtimeInputFormat
.
CREATE EXTERNAL TABLE `nonpartition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int, `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/nonpartition_mor/'
Partitioned merge on read (MoR) table
The following example creates a partitioned MoR table in Athena for read optimized queries.
CREATE EXTERNAL TABLE `partition_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/partition_mor/'
The following ALTER TABLE ADD PARTITION
example adds two
partitions to the example partition_mor
table.
ALTER TABLE partition_mor ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder
/partition_mor/two/'
The following example creates a partitioned MoR table in Athena for snapshot queries.
CREATE EXTERNAL TABLE `partition_mor_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `event_id` string, `event_time` string, `event_name` string, `event_guests` int) PARTITIONED BY ( `event_type` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/partition_mor/'
Similarly, the following ALTER TABLE ADD PARTITION
example adds
two partitions to the example partition_mor_rt
table.
ALTER TABLE partition_mor_rt ADD PARTITION (event_type = 'one') LOCATION 's3://DOC-EXAMPLE-BUCKET/
folder
/partition_mor/one/' PARTITION (event_type = 'two') LOCATION 's3://DOC-EXAMPLE-BUCKET/folder
/partition_mor/two/'
Additional resources
-
For information about using AWS Glue custom connectors and AWS Glue 2.0 jobs to create an Apache Hudi table that you can query with Athena, see Writing to Apache Hudi tables using AWS Glue custom connector
in the AWS Big Data Blog. -
For an article about using Apache Hudi, AWS Glue, and Amazon Athena to build a data processing framework for a data lake, see Simplify operational data processing in data lakes using AWS Glue and Apache Hudi
in the AWS Big Data Blog.