ETL およびデータ分析での CTAS および INSERT INTO を使用する
Athena で Create Table as Select (CTAS) と INSERT INTO ステートメントを使用して、データ処理のための Amazon S3 へのデータの抽出、変換、ロード (ETL) を行うことができます。このトピックでは、これらのステートメントを使用することで、データセットをパーティション分割して列指向データ形式に変換し、データ分析用に最適化する方法について説明します。
CTAS ステートメントでは、標準の SELECT クエリを使用して新しいテーブルを作成します。CTAS ステートメントを使用して、分析用のデータのサブセットを作成できます。1 つの CTAS ステートメントで、データのパーティション分割、圧縮の指定、Apache Parquet や Apache ORC などの列指向形式へのデータ変換を行うことができます。CTAS クエリを実行すると、それによって作成されるテーブルとパーティションが自動的に AWS Glue Data Catalog
INSERT INTO ステートメントは、ソーステーブルで実行される SELECT クエリステートメントに基づいて、ターゲットテーブルに新しい行を挿入します。INSERT INTO ステートメントを使用すると、CTAS がサポートするすべての変換を使用して、CSV 形式のソーステーブルデータをターゲットテーブルデータに変換およびロードできます。
概要
Athena では、CTAS ステートメントを使用してデータの初期バッチ変換を実行します。次に、複数の INSERT INTO ステートメントを使用して、CTAS ステートメントによって作成されたテーブルに対して増分更新を行います。
ステップ
ステップ 1: 元のデータセットに基づいてテーブルを作成する
このトピックの例では、一般公開されている NOAA Global Historical Climatology Network Daily (GHCN-D)
Location: s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/ Total objects: 41727 Size of CSV dataset: 11.3 GB Region: us-east-1
元のデータは、パーティションなしで Amazon S3 に保存されます。データは CSV 形式で、以下のようなファイル内にあります。
2019-10-31 13:06:57 413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000 2019-10-31 13:06:57 412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001 2019-10-31 13:06:57 34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002 2019-10-31 13:06:57 412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100 2019-10-31 13:06:57 412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101
このサンプルのファイルサイズは比較的小さくなっています。それらを大きなファイルにマージすることによってファイルの合計数を減らすことができるため、より優れたクエリパフォーマンスが実現されます。CTAS ステートメントと INSERT INTO ステートメントを使用して、クエリのパフォーマンスを向上させることができます。
サンプルデータセットに基づいてデータベースとテーブルを作成する
-
Athena コンソールで、AWS リージョン リージョンに [US East (N. Virginia)] (米国東部 (バージニア北部)) を選択します。このチュートリアルのクエリは、すべて
us-east-1
で実行するようにしてください。 -
Athena のクエリエディタで、CREATE DATABASE コマンドを実行してデータベースを作成します。
CREATE DATABASE blogdb
-
次のステートメントを実行して、テーブルを作成します。
CREATE EXTERNAL TABLE `blogdb`.`original_csv` ( `id` string, `date` string, `element` string, `datavalue` bigint, `mflag` string, `qflag` string, `sflag` string, `obstime` bigint) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/'
ステップ 2: CTAS を使用してデータをパーティション分割、変換、および圧縮する
テーブルを作成したら、1 つの CTAS ステートメントを使用して、データを Snappy 圧縮で Parquet 形式に変換し、データを年ごとにパーティション化できます。
ステップ 1 で作成したテーブルには、日付が YYYYMMDD
としてフォーマットされた date
フィールド (例: 20100104
) があります。新しいテーブルが year
でパーティション化されるため、次の手順のサンプルステートメントでは、Presto 関数 substr("date",1,4)
を使用して date
フィールドから値 year
を抽出します。
Snappy 圧縮を使用してデータを Parquet 形式に変換し、年ごとにパーティション分割する
-
your-bucket
をお使いの Amazon S3 バケットの場所に置き換えてから、以下の CTAS ステートメントを実行します。CREATE table new_parquet WITH (format='PARQUET', parquet_compression='SNAPPY', partitioned_by=array['year'], external_location = 's3://amzn-s3-demo-bucket/optimized-data/') AS SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) >= 2015 AND cast(substr("date",1,4) AS bigint) <= 2019
注記
この例では、作成するテーブルには 2015 年から 2019 年のデータのみが含まれます。ステップ 3 では、INSERT INTO コマンドを使用して、このテーブルに新しいデータを追加します。
クエリが完了したら、以下の手順を使用して、CTAS ステートメントで指定した Amazon S3 の場所にある出力を検証します。
CTAS ステートメントによって作成されたパーティションと parquet ファイルを表示する
-
作成されたパーティションを表示するには、次の AWS CLI コマンドを実行します。必ず、最後にスラッシュ (/) を含めてください。
aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/
出力はパーティションを示しています。
PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
-
Parquet ファイルを表示するには、以下のコマンドを実行します。出力を最初の 5 つの結果に制限する
|
head-5 オプションは、Windows では使用できません。aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/ --recursive --human-readable | head -5
出力は以下のようになります。
2019-10-31 14:51:05 7.3 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d 2019-10-31 14:51:05 7.0 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a 2019-10-31 14:51:05 9.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799 2019-10-31 14:51:05 7.5 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d 2019-10-31 14:51:05 6.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1
ステップ 3: INSERT INTO を使用してデータを追加する
ステップ 2 では、CTAS を使用して 2015 年から 2019 年までのデータのパーティションを持つテーブルを作成しました。しかし、元のデータセットには、2010 年から 2014 年までのデータも含まれています。次に、INSERT INTO ステートメントを使用してそのデータを追加します。
1 つ以上の INSERT INTO ステートメントを使用してテーブルにデータを追加する
-
WHERE 句で 2015 年より前の年を指定して、以下の INSERT INTO コマンドを実行します。
INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) < 2015
-
次の構文を使用して
aws s3 ls
コマンドを再度実行します。aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/
出力には、新しいパーティションが表示されます。
PRE year=2010/ PRE year=2011/ PRE year=2012/ PRE year=2013/ PRE year=2014/ PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
-
Parquet 形式の圧縮と列指向ストレージを使用して取得したデータセットのサイズが小さくなっていることを確認するには、以下のコマンドを実行します。
aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/ --recursive --human-readable --summarize
以下の結果は、Snappy 圧縮を使用した parquet 後のデータセットのサイズが 1.2 GB であることを示しています。
... 2020-01-22 18:12:02 2.8 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_f0182e6c-38f4-4245-afa2-9f5bfa8d6d8f 2020-01-22 18:11:59 3.7 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_fd9906b7-06cf-4055-a05b-f050e139946e Total Objects: 300 Total Size: 1.2 GiB
-
さらに多くの CSV データが元のテーブルに追加されている場合は、INSERT INTO ステートメントを使用して、そのデータを parquet テーブルに追加できます。たとえば、2020 年の新しいデータがある場合は、以下の INSERT INTO ステートメントを実行できます。このステートメントは、データおよび関連するパーティションを
new_parquet
テーブルに追加します。INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) = 2020
注記
この INSERT INTO ステートメントは、最大 100 個のパーティションの宛先テーブルへの書き込みをサポートします。ただし、100 個を超えるパーティションを追加するには、複数の INSERT INTO ステートメントを実行できます。詳細については、「CTAS および INSERT INTO を使用して 100 パーティションの制限を回避する」を参照してください。
ステップ 4: パフォーマンスとコストの差を測定する
データを変換した後、新しいテーブルと古いテーブルで同じクエリを実行し、結果を比較することで、パフォーマンスの向上とコスト削減を測定できます。
注記
Athena の クエリごとのコストについては、Amazon Athena の料金表
パフォーマンスの向上とコストの差を測定する
-
元のテーブルで以下のクエリを実行します。このクエリにより、年のすべての値の個別の ID の数が検出されます。
SELECT substr("date",1,4) as year, COUNT(DISTINCT id) FROM original_csv GROUP BY 1 ORDER BY 1 DESC
-
クエリが実行された時刻とスキャンされたデータの量を確認します。
-
新しいテーブルで同じクエリを実行し、クエリの実行時間とスキャンされたデータの量をメモします。
SELECT year, COUNT(DISTINCT id) FROM new_parquet GROUP BY 1 ORDER BY 1 DESC
-
結果を比較し、パフォーマンスとコストの差を計算します。以下のサンプル結果は、新しいテーブルのテストクエリが古いテーブルのクエリよりも高速で安価であることを示しています。
表 実行時間 スキャンされたデータ 元 16.88 秒 11.35 GB 新規 3.79 秒 428.05 MB -
元のテーブルで次のサンプルクエリを実行します。このクエリでは、2018 年の地球の平均最高温度 (摂氏)、平均最低気温 (摂氏)、平均降水量 (mm) を計算します。
SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM original_csv WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018' GROUP BY 1
-
クエリが実行された時刻とスキャンされたデータの量を確認します。
-
新しいテーブルで同じクエリを実行し、クエリの実行時間とスキャンされたデータの量をメモします。
SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM new_parquet WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018' GROUP BY 1
-
結果を比較し、パフォーマンスとコストの差を計算します。以下のサンプル結果は、新しいテーブルのテストクエリが古いテーブルのクエリよりも高速で安価であることを示しています。
表 実行時間 スキャンされたデータ 元 18.65 秒 11.35 GB 新規 1.92 秒 68 MB
[概要]
このトピックでは、Athena で CTAS と INSERT INTO ステートメントを使用して ETL オペレーションを実行する方法を説明しました。最初の変換セットは、データを Snappy 圧縮で Parquet 形式に変換する CTAS ステートメントを使用して実行しました。CTAS ステートメントは、データセットのパーティション分割されていないものからパーティション分割されたものへの変換も実行しました。これにより、サイズが小さくなり、クエリの実行コストが削減されました。新しいデータが使用可能になったら、INSERT INTO ステートメントを使用して、CTAS ステートメントで作成したテーブルにデータを変換およびロードできます。