将 CTAS 和 INSERT INTO 用于 ETL 和数据分析
在 Athena 中使用 Create Table as Select (CTAS) 和 INSERT INTO 语句可将数据提取、转换和加载 (ETL) 到 Amazon S3 中以进行数据处理。本主题说明了如何使用这些语句对数据集进行分区并将其转换为列式数据格式,以对其进行优化来进行数据分析。
CTAS 语句使用标准 SELECT 查询来创建新表。您可以使用 CTAS 语句创建数据子集以进行分析。在一个 CTAS 语句中,您可以对数据进行分区、指定压缩并将数据转换为列状格式,如 Apache Parquet 或 Apache ORC。在运行 CTAS 查询时,该查询创建的表和分区将自动添加到 AWS Glue Data Catalog
INSERT INTO 语句基于在源表上运行的 SELECT 查询语句将新行插入到目标表中。您可以使用 INSERT INTO 语句通过 CTAS 支持的所有转换来转换 CSV 格式的源表数据并将其加载到目标表数据中。
概述
在 Athena 中,使用 CTAS 语句执行数据的初始批量转换。然后,使用多个 INSERT INTO 语句对 CTAS 语句所创建的表进行增量更新。
步骤 1:基于原始数据集创建表
本主题中的示例使用 Simple Storage Service (Amazon S3) 可读取的公开发布的 NOAA global historical climatology network daily (GHCN-d)
Location: s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/ Total objects: 41727 Size of CSV dataset: 11.3 GB Region: us-east-1
原始数据存储在不带分区的 Amazon S3 中。文件中的数据采用 CSV 格式,如下所示。
2019-10-31 13:06:57 413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000 2019-10-31 13:06:57 412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001 2019-10-31 13:06:57 34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002 2019-10-31 13:06:57 412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100 2019-10-31 13:06:57 412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101
此示例中的文件大小相对较小。通过将这些文件合并为更大的文件,可以减少文件总数,从而优化查询执行的性能。可以使用 CTAS 和 INSERT INTO 语句来增强查询性能。
创建基于示例数据集的数据库和表
-
在 Athena 控制台中,选择 US East (N. Virginia) [美国东部(弗吉尼亚北部)]AWS 区域。请务必在
us-east-1
中运行本教程中的所有查询。 -
在 Athena 查询编辑器中,运行 CREATE DATABASE 命令以创建数据库。
CREATE DATABASE blogdb
-
运行以下语句以创建表。
CREATE EXTERNAL TABLE `blogdb`.`original_csv` ( `id` string, `date` string, `element` string, `datavalue` bigint, `mflag` string, `qflag` string, `sflag` string, `obstime` bigint) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/'
步骤 2:使用 CTAS 对数据进行分区、转换和压缩
创建表后,您可以使用单个 CTAS 语句通过 Snappy 压缩将数据转换为 Parquet 格式,并按年份对数据进行分区。
您在步骤 1 中创建的表具有一个 date
字段,其数据格式化为 YYYYMMDD
(例如,20100104
)。由于将按 year
对新表进行分区,因此,以下过程中的示例语句使用 Presto 函数 substr("date",1,4)
从 date
字段中提取 year
值。
使用 Snappy 压缩将数据转换为 Parquet 格式(按年份进行分区)
-
运行以下 CTAS 语句,并将
your-bucket
替换为您的 Amazon S3 存储桶位置。CREATE table new_parquet WITH (format='PARQUET', parquet_compression='SNAPPY', partitioned_by=array['year'], external_location = 's3://amzn-s3-demo-bucket/optimized-data/') AS SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) >= 2015 AND cast(substr("date",1,4) AS bigint) <= 2019
注意
在此示例中,您创建的表仅包含 2015 年至 2019 年的数据。在步骤 3 中,使用 INSERT INTO 命令将新数据添加到此表。
在查询完成后,请使用以下过程验证您在 CTAS 语句中指定的 Amazon S3 位置中的输出。
查看 CTAS 语句所创建的分区和 parquet 文件
-
要显示创建的分区,请运行以下 AWS CLI 命令。请务必包含最后的正斜杠 (/)。
aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/
输出将显示分区。
PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
-
要查看 Parquet 文件,请运行以下命令。请注意,
|
head -5 选项(该选项将输出限制为前五个结果)在 Windows 上不可用。aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/ --recursive --human-readable | head -5
输出与以下内容类似。
2019-10-31 14:51:05 7.3 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d 2019-10-31 14:51:05 7.0 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a 2019-10-31 14:51:05 9.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799 2019-10-31 14:51:05 7.5 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d 2019-10-31 14:51:05 6.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1
步骤 3:使用 INSERT INTO 添加数据
在步骤 2 中,您已使用 CTA 创建一个表,其中包含 2015 年至 2019 年的分区。但原始数据集还包含 2010 年至 2014 年的数据。现在,您使用 INSERT INTO 语句添加该数据。
使用一个或多个 INSERT INTO 语句向表添加数据
-
运行以下 INSERT INTO 命令,并在 WHERE 子句中指定 2015 年之前的年份。
INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) < 2015
-
再次运行
aws s3 ls
命令,并使用以下语法。aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/
输出将显示新分区。
PRE year=2010/ PRE year=2011/ PRE year=2012/ PRE year=2013/ PRE year=2014/ PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
-
要查看通过使用 Parquet 格式的压缩和列式存储获得的数据集的大小减少量,请运行以下命令。
aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/ --recursive --human-readable --summarize
以下结果表明,使用 Snappy 压缩进行 parquet 操作后的数据集的大小为 1.2 GB。
... 2020-01-22 18:12:02 2.8 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_f0182e6c-38f4-4245-afa2-9f5bfa8d6d8f 2020-01-22 18:11:59 3.7 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_fd9906b7-06cf-4055-a05b-f050e139946e Total Objects: 300 Total Size: 1.2 GiB
-
如果将更多的 CSV 数据添加到原始表中,则可使用 INSERT INTO 语句将该数据添加到 parquet 表中。例如,如果您有 2020 年的新数据,则可运行以下 INSERT INTO 语句。该语句可将数据和相关分区添加到
new_parquet
表中。INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) = 2020
注意
INSERT INTO 语句支持向目标表写入最多 100 个分区。不过,要添加 100 个以上的分区,您可以运行多个 INSERT INTO 语句。有关更多信息,请参阅 使用 CTAS 和 INSERT INTO 绕过 100 分区限制。
步骤 4:衡量性能和成本差异
在转换数据后,您可以通过对新表和旧表运行相同的查询并比较结果来衡量性能提升和成本节省。
注意
有关 Athena 每查询成本信息,请参阅 Amazon Athena 定价
衡量性能提升和成本差异
-
对原始表运行以下查询。此查询将查找每个年份值的不同 ID 的数量。
SELECT substr("date",1,4) as year, COUNT(DISTINCT id) FROM original_csv GROUP BY 1 ORDER BY 1 DESC
-
请注意查询运行的时间和扫描的数据量。
-
对新表运行相同的查询,并记下查询运行时和扫描的数据量。
SELECT year, COUNT(DISTINCT id) FROM new_parquet GROUP BY 1 ORDER BY 1 DESC
-
比较结果并计算性能和成本差异。以下示例结果表明,与对旧表执行的测试查询相比,对新表执行的测试查询的速度更快、成本更低。
表 运行时 扫描的数据 原始 16.88 秒 11.35 GB New 3.79 秒 428.05 MB -
对原始表运行以下示例查询。该查询将计算 2018 年地球上的平均最高温度(摄氏度)、平均最低温度(摄氏度)和平均降雨量(毫米)。
SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM original_csv WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018' GROUP BY 1
-
请注意查询运行的时间和扫描的数据量。
-
对新表运行相同的查询,并记下查询运行时和扫描的数据量。
SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM new_parquet WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018' GROUP BY 1
-
比较结果并计算性能和成本差异。以下示例结果表明,与对旧表执行的测试查询相比,对新表执行的测试查询的速度更快、成本更低。
表 运行时 扫描的数据 原始 18.65 秒 11.35 GB New 1.92 秒 68 MB
Summary
本主题介绍了如何在 Athena 中使用 CTAS 和 INSERT INTO 语句来执行 ETL 操作。您使用一个 CTAS 语句执行了第一组转换,通过 Snappy 压缩将数据转换为了 Parquet 格式。该 CTAS 语句还将数据集从未分区的数据集转换为已分区的数据集。这减小了其大小,并降低了查询的运行成本。在提供新的数据时,您可以使用 INSERT INTO 语句转换数据并将其加载到使用 CTAS 语句创建的表中。