データを Apache Parquet に変換するための AWS 3 つの Glue ETLジョブタイプ - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

データを Apache Parquet に変換するための AWS 3 つの Glue ETLジョブタイプ

作成者: Adnan Alvee (AWS)、Karthikeyan Ramachandran、および Nith Govindasivan (AWS)

環境:PoC またはパイロット

テクノロジー: 分析

ワークロード:その他すべてのワークロード

AWS サービス: AWS Glue

[概要]

Amazon Web Services (AWS) クラウドでは、Glue AWS はフルマネージドの抽出、変換、ロード (ETL) サービスです。AWS Glue を使用すると、データを分類し、クリーンアップし、充実させ、さまざまなデータストアとデータストリーム間で確実に移動させることがコスト効率良くなります。

このパターンは Glue AWS で異なるジョブタイプを提供し、3 つの異なるスクリプトを使用してオーサリングETLジョブを示します。

AWS Glue を使用して、Python シェル環境でETLジョブを記述できます。マネージド Apache Spark 環境で Python (PySpark) または Scala を使用して、バッチジョブとストリーミングETLジョブの両方を作成することもできます。オーサリングETLジョブを開始するために、このパターンは Python シェル PySpark、および Scala を使用したバッチETLジョブに焦点を当てています。Python シェルジョブは、より少ない計算能力を必要とするワークロードを対象としています。マネージド Apache Spark 環境は、高い計算能力を必要とするワークロードを対象としています。

Apache Parquet は、効率的な圧縮とエンコードスキームをサポートするように構築されています。データを列指向に保存するため、分析ワークロードを高速化できます。データを Parquet に変換すると、長期的にはストレージ容量、コスト、時間を節約できます。Parquet について詳しくは、ブログ記事「Apache Parquet:オープンソースの列指向データ形式でヒーローになる方法」をご覧ください。

前提条件と制限

前提条件

  • AWS Identity and Access Management (IAM) ロール (ロールがない場合は、「追加情報」セクションを参照してください)。

アーキテクチャ

ターゲットテクノロジースタック

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

自動化とスケール

  • AWS Glue ワークフローは、ETLパイプラインの完全な自動化をサポートします。

  • データ処理ユニット (DPUs) またはワーカータイプの数を変更して、水平方向および垂直方向にスケーリングできます。

ツール

AWS サービス

  • Amazon Simple Storage Service (Amazon S3) は、どのようなデータ量であっても、データを保存、保護、取得することを支援するクラウドベースのオブジェクトストレージサービスです。

  • AWS Glue は、さまざまなデータストアとデータストリーム間でデータを分類、クリーニング、強化、移動するためのフルマネージドETLサービスです。

その他のツール

  • Apache Parquet」は、ストレージとデータの取得を目的として設計されたオープンソースの列指向データファイル形式です。

設定

Glue AWS の計算能力を設定するには、次の設定を使用しますETL。コストを削減するには、このパターンで提供されるワークロードを実行するときには最小限の設定を使用してください。 

  • Python シェル – 1 を使用して 16 GB のメモリDPUを使用するか、0.0625 DPUを使用して 1 GB のメモリを使用できます。このパターンは 0.0625 を使用します。これは DPUGlue AWS コンソールのデフォルトです。

  • Python または Scala for Spark – コンソールで Spark 関連のジョブタイプを選択した場合、Glue AWS はデフォルトで 10 個のワーカーと G.1X ワーカータイプを使用します。このパターンでは、2 つのワーカー (許容される最小数) と標準のワーカータイプを使用するため、十分かつ費用対効果が高くなります。

次の表は、Apache Spark 環境のさまざまな AWS Glue ワーカータイプを示しています。Python シェルジョブは Apache Spark 環境を使用して Python を実行しないため、このテーブルには含まれていません。

標準

G.1X

G.2X

vCPU

4

4

8

「メモリ」

16 GB

16 GB

32 GB

ディスク容量

50 GB

64 GB

128 GB

ワーカーごとのエグゼキューター

2

1

コード

IAM ロールやパラメータ設定など、このパターンで使用されるコードについては、「追加情報」セクションを参照してください。

エピック

タスク説明必要なスキル

データを新規または既存の S3 バケットにアップロードします。

お使いののアカウント内で新しい S3 バケットを作成するか、既存の S3 バケットを使用します。「添付ファイル」セクションから sample_data.csv ファイルをアップロードし、S3 バケットとプレフィックスの場所を書き留めます。

全般 AWS
タスク説明必要なスキル

Glue AWS ジョブを作成します。

Glue AWS コンソールの ETLセクションで、Glue AWS ジョブを追加します。適切なジョブタイプ、Glue AWS バージョン、対応する DPU/Worker タイプとワーカー数を選択します。詳細については、「設定」セクションを参照してください。

デベロッパー、クラウド、またはデータ

入力位置と出力位置を変更します。

Glue AWS ジョブに対応するコードをコピーし、データエピックのアップロードでメモした入出力場所を変更します。

デベロッパー、クラウド、またはデータ

パラメータを設定します。

追加情報セクションで提供されているスニペットを使用して、ETLジョブのパラメータを設定できます。AWS Glue は内部的に 4 つの引数名を使用します。

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

--JOB_NAME パラメータは Glue AWS コンソールに明示的に入力する必要があります。[ジョブ]、[ジョブの編集]、[セキュリティ設定]、[スクリプトライブラリ]、および [ジョブパラメータ] (オプション) を選択します。--JOB_NAME キーとして入力し、値を提供します。AWS コマンドラインインターフェイス (AWS CLI) または AWS Glue を使用してAPI、このパラメータを設定することもできます。--JOB_NAME パラメーターは Spark によって使用され、Python シェル環境のジョブでは必要ありません。

-- をすべてのパラメーター名の前に追加する必要があります。そうしないと、コードは機能しません。たとえば、コードスニペットの場合、ロケーションパラメータは --input_loc--output_loc によって呼び出される必要があります。

デベロッパー、クラウド、またはデータ

ETL ジョブを実行します。

ジョブを実行し、出力を確認します。元のファイルからどれだけの容量が削減されたかに注意してください。

デベロッパー、クラウド、またはデータ

関連リソース

リファレンス

チュートリアルと動画

追加情報

IAM ロール

AWS Glue ジョブを作成するときは、次のコードスニペットに示すアクセス許可を持つ既存のIAMロールまたは新しいロールを使用できます。

新しいロールを作成するには、次のYAMLコードを使用します。

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

AWS Glue Python シェル

Python コードは Pandas と PyArrow ライブラリを使用してデータを Parquet に変換します。Pandas ライブラリは既に使用可能です。 PyArrow ライブラリは 1 回限りの実行であるため、パターンの実行時にダウンロードされます。ホイールファイルを使用してライブラリ PyArrow に変換し、ファイルをライブラリパッケージとして指定できます。wheel ファイルのパッケージングについての詳細は、「独自の Python ライブラリの提供」を参照してください。

AWS Glue Python シェルパラメータ

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

AWS Glue Python シェルコード

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

AWS Python を使用した Glue Spark ジョブ

Python で AWS Glue Spark ジョブタイプを使用するには、ジョブタイプとして Spark を選択します。Spark 3.1、ジョブの起動時間を改善した Python 3 (Glue バージョン 3.0) を AWS Glue バージョンとして選択します。

AWS Glue Python パラメータ

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

AWS Python コードを使用した Glue Spark ジョブ

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

圧縮されたサイズの大きいファイルが多数ある場合 (たとえば、1,000 個のファイルがそれぞれ約 3 MB)、次のコードに示すように、recurse パラメータと compressionType パラメータを組み合わせてプレフィックス内にあるすべてのファイルを読み取ります。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

圧縮された小さなファイルが多数ある場合 (たとえば、それぞれが約 133 KB の 1,000 ファイル)、compressionType パラメーターと recurse パラメーターとともに、groupFiles パラメーターを使用してください。groupFiles パラメータは小さなファイルを複数の大きなファイルにグループ化し、groupSize パラメータは指定されたサイズ (たとえば 1 MB) にグループ化を制御します。次のコードスニペットは、コード内でこれらのパラメーターを使用する例を示しています。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

ワーカーノードに変更を加えることなく、これらの設定により、Glue AWS ジョブは複数のファイル (大小、圧縮の有無にかかわらず) を読み取って、Parquet 形式でターゲットに書き込むことができます。

AWS Scala での Glue Spark ジョブ

Scala で AWS Glue Spark ジョブタイプを使用するには、ジョブタイプとして SparkScala として 言語 を選択します。ジョブの起動時間が改善された Spark 3.1、Scala 2 (Glue バージョン 3.0) AWS を Glue バージョンとして選択します。ストレージスペースを節約するために、次の AWS Glue with Scala サンプルでは、 applyMapping機能を使用してデータ型を変換します。

AWS Glue Scala パラメータ

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

AWS Scala コードを使用した Glue Spark ジョブ

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

添付ファイル

このドキュメントに関連する追加コンテンツにアクセスするには、次のファイルを解凍してください。「attachment.zip