ETL およびデータ分析での CTAS および INSERT INTO を使用する - Amazon Athena

ETL およびデータ分析での CTAS および INSERT INTO を使用する

Athena で Create Table as Select (CTAS) と INSERT INTO ステートメントを使用して、データ処理のための Amazon S3 へのデータの抽出、変換、ロード (ETL) を行うことができます。このトピックでは、これらのステートメントを使用することで、データセットをパーティション分割して列指向データ形式に変換し、データ分析用に最適化する方法について説明します。

CTAS ステートメントでは、標準の SELECT クエリを使用して新しいテーブルを作成します。CTAS ステートメントを使用して、分析用のデータのサブセットを作成できます。1 つの CTAS ステートメントで、データのパーティション分割、圧縮の指定、Apache Parquet や Apache ORC などの列指向形式へのデータ変換を行うことができます。CTAS クエリを実行すると、それによって作成されるテーブルとパーティションが自動的に AWS Glue Data Catalog に追加されます。これにより、作成した新しいテーブルとパーティションは、その後のクエリですぐに使用できます。

INSERT INTO ステートメントは、ソーステーブルで実行される SELECT クエリステートメントに基づいて、ターゲットテーブルに新しい行を挿入します。INSERT INTO ステートメントを使用すると、CTAS がサポートするすべての変換を使用して、CSV 形式のソーステーブルデータをターゲットテーブルデータに変換およびロードできます。

概要

Athena では、CTAS ステートメントを使用してデータの初期バッチ変換を実行します。次に、複数の INSERT INTO ステートメントを使用して、CTAS ステートメントによって作成されたテーブルに対して増分更新を行います。

ステップ 1: 元のデータセットに基づいてテーブルを作成する

このトピックの例では、一般公開されている NOAA Global Historical Climatology Network Daily (GHCN-D) データセットを使用した、Amazon S3 の読み取り可能なサブセットを使用します。Amazon S3 上のデータには、以下の特徴があります。

Location: s3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/ Total objects: 41727 Size of CSV dataset: 11.3 GB Region: us-east-1

元のデータは、パーティションなしで Amazon S3 に保存されます。データは CSV 形式で、以下のようなファイル内にあります。

2019-10-31 13:06:57 413.1 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0000 2019-10-31 13:06:57 412.0 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0001 2019-10-31 13:06:57 34.4 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0002 2019-10-31 13:06:57 412.2 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0100 2019-10-31 13:06:57 412.7 KiB artifacts/athena-ctas-insert-into-blog/2010.csv0101

このサンプルのファイルサイズは比較的小さくなっています。それらを大きなファイルにマージすることによってファイルの合計数を減らすことができるため、より優れたクエリパフォーマンスが実現されます。CTAS ステートメントと INSERT INTO ステートメントを使用して、クエリのパフォーマンスを向上させることができます。

サンプルデータセットに基づいてデータベースとテーブルを作成する
  1. Athena コンソールで、AWS リージョン リージョンに [US East (N. Virginia)] (米国東部 (バージニア北部)) を選択します。このチュートリアルのクエリは、すべて us-east-1 で実行するようにしてください。

  2. Athena のクエリエディタで、CREATE DATABASE コマンドを実行してデータベースを作成します。

    CREATE DATABASE blogdb
  3. 次のステートメントを実行して、テーブルを作成します。

    CREATE EXTERNAL TABLE `blogdb`.`original_csv` ( `id` string, `date` string, `element` string, `datavalue` bigint, `mflag` string, `qflag` string, `sflag` string, `obstime` bigint) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://aws-bigdata-blog/artifacts/athena-ctas-insert-into-blog/'

ステップ 2: CTAS を使用してデータをパーティション分割、変換、および圧縮する

テーブルを作成したら、1 つの CTAS ステートメントを使用して、データを Snappy 圧縮で Parquet 形式に変換し、データを年ごとにパーティション化できます。

ステップ 1 で作成したテーブルには、日付が YYYYMMDD としてフォーマットされた date フィールド (例: 20100104) があります。新しいテーブルが year でパーティション化されるため、次の手順のサンプルステートメントでは、Presto 関数 substr("date",1,4) を使用して date フィールドから値 year を抽出します。

Snappy 圧縮を使用してデータを Parquet 形式に変換し、年ごとにパーティション分割する
  • your-bucket をお使いの Amazon S3 バケットの場所に置き換えてから、以下の CTAS ステートメントを実行します。

    CREATE table new_parquet WITH (format='PARQUET', parquet_compression='SNAPPY', partitioned_by=array['year'], external_location = 's3://amzn-s3-demo-bucket/optimized-data/') AS SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) >= 2015 AND cast(substr("date",1,4) AS bigint) <= 2019
    注記

    この例では、作成するテーブルには 2015 年から 2019 年のデータのみが含まれます。ステップ 3 では、INSERT INTO コマンドを使用して、このテーブルに新しいデータを追加します。

クエリが完了したら、以下の手順を使用して、CTAS ステートメントで指定した Amazon S3 の場所にある出力を検証します。

CTAS ステートメントによって作成されたパーティションと parquet ファイルを表示する
  1. 作成されたパーティションを表示するには、次の AWS CLI コマンドを実行します。必ず、最後にスラッシュ (/) を含めてください。

    aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/

    出力はパーティションを示しています。

    PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
  2. Parquet ファイルを表示するには、以下のコマンドを実行します。出力を最初の 5 つの結果に制限する | head-5 オプションは、Windows では使用できません。

    aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/ --recursive --human-readable | head -5

    出力は以下のようになります。

    2019-10-31 14:51:05 7.3 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_1be48df2-3154-438b-b61d-8fb23809679d 2019-10-31 14:51:05 7.0 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_2a57f4e2-ffa0-4be3-9c3f-28b16d86ed5a 2019-10-31 14:51:05 9.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_34381db1-00ca-4092-bd65-ab04e06dc799 2019-10-31 14:51:05 7.5 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_354a2bc1-345f-4996-9073-096cb863308d 2019-10-31 14:51:05 6.9 MiB optimized-data/year=2015/20191031_215021_00001_3f42d_42da4cfd-6e21-40a1-8152-0b902da385a1

ステップ 3: INSERT INTO を使用してデータを追加する

ステップ 2 では、CTAS を使用して 2015 年から 2019 年までのデータのパーティションを持つテーブルを作成しました。しかし、元のデータセットには、2010 年から 2014 年までのデータも含まれています。次に、INSERT INTO ステートメントを使用してそのデータを追加します。

1 つ以上の INSERT INTO ステートメントを使用してテーブルにデータを追加する
  1. WHERE 句で 2015 年より前の年を指定して、以下の INSERT INTO コマンドを実行します。

    INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) < 2015
  2. 次の構文を使用して aws s3 ls コマンドを再度実行します。

    aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/

    出力には、新しいパーティションが表示されます。

    PRE year=2010/ PRE year=2011/ PRE year=2012/ PRE year=2013/ PRE year=2014/ PRE year=2015/ PRE year=2016/ PRE year=2017/ PRE year=2018/ PRE year=2019/
  3. Parquet 形式の圧縮と列指向ストレージを使用して取得したデータセットのサイズが小さくなっていることを確認するには、以下のコマンドを実行します。

    aws s3 ls s3://amzn-s3-demo-bucket/optimized-data/ --recursive --human-readable --summarize

    以下の結果は、Snappy 圧縮を使用した parquet 後のデータセットのサイズが 1.2 GB であることを示しています。

    ... 2020-01-22 18:12:02 2.8 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_f0182e6c-38f4-4245-afa2-9f5bfa8d6d8f 2020-01-22 18:11:59 3.7 MiB optimized-data/year=2019/20200122_181132_00003_nja5r_fd9906b7-06cf-4055-a05b-f050e139946e Total Objects: 300 Total Size: 1.2 GiB
  4. さらに多くの CSV データが元のテーブルに追加されている場合は、INSERT INTO ステートメントを使用して、そのデータを parquet テーブルに追加できます。たとえば、2020 年の新しいデータがある場合は、以下の INSERT INTO ステートメントを実行できます。このステートメントは、データおよび関連するパーティションを new_parquet テーブルに追加します。

    INSERT INTO new_parquet SELECT id, date, element, datavalue, mflag, qflag, sflag, obstime, substr("date",1,4) AS year FROM original_csv WHERE cast(substr("date",1,4) AS bigint) = 2020
    注記

    この INSERT INTO ステートメントは、最大 100 個のパーティションの宛先テーブルへの書き込みをサポートします。ただし、100 個を超えるパーティションを追加するには、複数の INSERT INTO ステートメントを実行できます。詳細については、「CTAS および INSERT INTO を使用して 100 パーティションの制限を回避する」を参照してください。

ステップ 4: パフォーマンスとコストの差を測定する

データを変換した後、新しいテーブルと古いテーブルで同じクエリを実行し、結果を比較することで、パフォーマンスの向上とコスト削減を測定できます。

注記

Athena の クエリごとのコストについては、Amazon Athena の料金表を参照してください。

パフォーマンスの向上とコストの差を測定する
  1. 元のテーブルで以下のクエリを実行します。このクエリにより、年のすべての値の個別の ID の数が検出されます。

    SELECT substr("date",1,4) as year, COUNT(DISTINCT id) FROM original_csv GROUP BY 1 ORDER BY 1 DESC
  2. クエリが実行された時刻とスキャンされたデータの量を確認します。

  3. 新しいテーブルで同じクエリを実行し、クエリの実行時間とスキャンされたデータの量をメモします。

    SELECT year, COUNT(DISTINCT id) FROM new_parquet GROUP BY 1 ORDER BY 1 DESC
  4. 結果を比較し、パフォーマンスとコストの差を計算します。以下のサンプル結果は、新しいテーブルのテストクエリが古いテーブルのクエリよりも高速で安価であることを示しています。

    実行時間 スキャンされたデータ
    16.88 秒 11.35 GB
    新規 3.79 秒 428.05 MB
  5. 元のテーブルで次のサンプルクエリを実行します。このクエリでは、2018 年の地球の平均最高温度 (摂氏)、平均最低気温 (摂氏)、平均降水量 (mm) を計算します。

    SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM original_csv WHERE element IN ('TMIN', 'TMAX', 'PRCP') AND substr("date",1,4) = '2018' GROUP BY 1
  6. クエリが実行された時刻とスキャンされたデータの量を確認します。

  7. 新しいテーブルで同じクエリを実行し、クエリの実行時間とスキャンされたデータの量をメモします。

    SELECT element, round(avg(CAST(datavalue AS real)/10),2) AS value FROM new_parquet WHERE element IN ('TMIN', 'TMAX', 'PRCP') and year = '2018' GROUP BY 1
  8. 結果を比較し、パフォーマンスとコストの差を計算します。以下のサンプル結果は、新しいテーブルのテストクエリが古いテーブルのクエリよりも高速で安価であることを示しています。

    実行時間 スキャンされたデータ
    18.65 秒 11.35 GB
    新規 1.92 秒 68 MB

[概要]

このトピックでは、Athena で CTAS と INSERT INTO ステートメントを使用して ETL オペレーションを実行する方法を説明しました。最初の変換セットは、データを Snappy 圧縮で Parquet 形式に変換する CTAS ステートメントを使用して実行しました。CTAS ステートメントは、データセットのパーティション分割されていないものからパーティション分割されたものへの変換も実行しました。これにより、サイズが小さくなり、クエリの実行コストが削減されました。新しいデータが使用可能になったら、INSERT INTO ステートメントを使用して、CTAS ステートメントで作成したテーブルにデータを変換およびロードできます。