チュートリアル: AWS Glue for Spark スクリプトの記述 - AWS Glue

チュートリアル: AWS Glue for Spark スクリプトの記述

このチュートリアルでは、AWS Glue スクリプトを記述するプロセスの概要を説明します。スクリプトを、ジョブを使用してスケジュールに従って実行したり、インタラクティブセッションを使用してインタラクティブに実行したりできます。ジョブの詳細については、「AWS Glue Studio でビジュアル ETL ジョブを作成する」を参照してください。詳細については、「AWS Glue インタラクティブセッションの概要」を参照してください。

AWS Glue Studio ビジュアルエディターは、AWS Glue ジョブを構築するためのグラフィカルなノーコードのインターフェイスを提供します。AWSGlue スクリプトはビジュアルジョブをバックアップします。Glue スクリプトを使用すると、Apache Spark プログラムを操作するための拡張された一連のツールにアクセスできます。AWS Glue スクリプト内からの抽出、変換、ロード (ETL) ワークフローを円滑にする AWS Glue ライブラリだけでなく、Spark の ネイティブ API にもアクセスできます。

このチュートリアルでは、駐車チケットのデータセットを抽出、変換、ロードします。この作業を行うスクリプトは、AWS Glue Studio ビジュアルエディタを紹介する AWS ビッグデータブログの「AWS Glue Studio で ETL を簡単にする」で作成したものと形式および機能が同じです。このスクリプトをジョブ内で実行することで、AWS Glue ETL スクリプトがどのように機能するかをビジュアルジョブと比較して確認できます。これにより、ビジュアルジョブでまだ使用可能ではない追加機能を使用する準備ができます。

このチュートリアルでは、Python 言語とライブラリを使用しています。Scala でも同様の機能が使用可能です。このチュートリアルを完了すると、Scala のサンプルスクリプトを生成して検査し、Scala での AWS Glue ETL スクリプトの記述プロセスを実行する方法を理解できるようになります。

前提条件

このチュートリアルには、次のような前提条件があります。

  • AWS CloudFormation テンプレートを実行するように指示する AWS Glue Studio のブログ記事と同じ前提条件。

    このテンプレートは、AWS Glue データカタログを使用して、s3://aws-bigdata-blog/artifacts/gluestudio/ で使用できる駐車券データセットを管理します。これによって参照される以下のリソースが作成されます。

  • AWS Glue StudioRole - AWS Glue ジョブを実行する IAM ロール

  • AWS Glue StudioAmazon S3Bucket - ブログ関連のファイルを保存する Amazon S3 バケットの名前

  • AWS Glue StudioTicketsYYZDB – AWS Glue データカタログデータベース

  • AWS Glue StudioTableTickets - ソースとして使用するデータカタログテーブル

  • AWS Glue StudioTableTrials - ソースとして使用するデータカタログテーブル

  • AWS Glue StudioParkingTicketCount - 送信先として使用するデータカタログテーブル

  • AWS Glue Studio のブログ記事で生成されたスクリプト。ブログ記事が変更された場合、スクリプトは次のテキストからも入手できます。

サンプルスクリプトの生成

AWS Glue Studio ビジュアルエディタを強力なコード生成ツールとして使用することで、記述するスクリプトの足場を構築できます。サンプルスクリプトは、このツールを使用して作成します。

これらのステップをスキップしたい場合は、提供されているスクリプトを使用してください。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 bucket S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog( database="yyz-tickets", table_name="tickets", transformation_ctx="S3bucket_node1" ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("tag_number_masked", "string", "tag_number_masked", "string"), ("date_of_infraction", "string", "date_of_infraction", "string"), ("ticket_date", "string", "ticket_date", "string"), ("ticket_number", "decimal", "ticket_number", "float"), ("officer", "decimal", "officer_name", "decimal"), ("infraction_code", "decimal", "infraction_code", "decimal"), ("infraction_description", "string", "infraction_description", "string"), ("set_fine_amount", "decimal", "set_fine_amount", "float"), ("time_of_infraction", "decimal", "time_of_infraction", "decimal"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node S3 bucket S3bucket_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="glueparquet", connection_options={"path": "s3://DOC-EXAMPLE-BUCKET", "partitionKeys": []}, format_options={"compression": "gzip"}, transformation_ctx="S3bucket_node3", ) job.commit()
サンプルスクリプトを生成するには
  1. AWS Glue Studio のチュートリアルをを完了します。このチュートリアルを完了するには、「サンプルジョブから AWS Glue Studio でジョブの作成」を参照してください。

  2. 次のスクリーンショットに示すように、ジョブページの [Script] (スクリプト) タブに移動します。

    AWS Glue ジョブの [Script] (スクリプト) タブ。
  3. [Script] (スクリプト) タブのすべての内容をコピーします。[Job details] (Job の詳細) でスクリプト言語を設定することで、生成するコードを Python と Scala の間で自由に切り替えることができます。

ステップ 1. ジョブを作成してスクリプトを貼り付ける

このステップでは、AWS Management Consoleで AWS Glue ジョブを作成します。これにより、AWS Glue でスクリプトを実行するのに必要な構成がセットアップされます。また、スクリプトを保存および編集するための場所も作成されます。

ジョブを作成するには
  1. AWS Management Consoleで、AWS Glue のランディングページに移動します。

  2. サイドナビゲーションペインで、[Jobs] (ジョブ) を選択します。

  3. [Create job] (ジョブの作成) で [Spark script editor] (Spark スクリプトエディタ) を選択してから、[Create] (作成) を選択します。

  4. オプション — スクリプトの全文を [Script] (スクリプト) ペインに貼り付けます。この代わりに、チュートリアルに従うこともできます。

ステップ 2. AWS Glue ライブラリをインポートする

スクリプトの外部で定義されたコードや設定とやり取りするには、スクリプトの設定が必要です。この作業は、AWS Glue Studioの背後で実行されます。

このステップでは、次のアクションを実行します。

  • GlueContext オブジェクトをインポートして初期化します。スクリプト作成の観点から見ると、このインポートが最も重要なものとなります。これにより、ETL スクリプトの開始点となる、ソースデータセットとターゲットデータセットを定義するための標準的な方法が表示されます。GlueContext クラスの詳細については、「GlueContext クラス」を参照してください。

  • SparkContext および SparkSession を初期化します。これにより、AWS Glue ジョブ 内部で Spark エンジンを使用するための設定が可能になります。入門用 AWS Glue スクリプトの中では、これらを直接使用する必要はありません

  • getResolvedOptions を呼び出し、スクリプト内でジョブ引数を使用するための準備を行います。ジョブパラメータの解決方法の詳細については、「getResolvedOptions を使用して、パラメータにアクセスする」を参照してください。

  • Job を初期化します。Job オブジェクトにより設定が行われ、AWS Glue のさまざまなオプションの機能の状態が追跡されます。スクリプトは Job オブジェクトがなくても実行できますが、ベストプラクティスとして、後にこの機能が統合された場合に混乱が生じないようにオブジェクトを初期化しておきます。

    ジョブのブックマークは、これらの機能の 1 つであり、このチュートリアルではオプションで設定可能です。ジョブのブックマークに関する説明は、「オプション — ジョブのブックマークを有効にする」のセクションで確認できます。

この手順では、次のコードを作成します。このコードは、生成されるサンプルスクリプトの一部です。

from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args)
AWS Glue ライブラリをインポートするには
  • コードのこのセクションをコピーして、[Script] (スクリプト) エディタに貼り付けます。

    注記

    コードのコピーは技術的に推奨されないと思われているかもしれません。ただ、このチュートリアルでは、AWS Glue ETL スクリプト全体で主要な変数に一貫して名前を付けることが推奨されるため、これを提案しています。

ステップ 3. ソースからデータを抽出する

すべての ETL プロセスにおいて、変更するソースデータセットを最初に定義する必要があります。AWS Glue Studio ビジュアルエディタでは、[Source] (ソース) ノードを作成することで、この情報を指定します。

このステップでは、AWS Glue データカタログで設定されたソースからデータを抽出する create_dynamic_frame.from_catalog メソッドに database および table_name を提供します。

GlueContext オブジェクトは、前のステップで初期化が済んでいます。このオブジェクトを使用して、ソースの設定に使用される create_dynamic_frame.from_catalog などのメソッドを検索します。

この手順では、create_dynamic_frame.from_catalog を使用して次のコードを記述します。このコードは、生成されるサンプルスクリプトの一部です。

S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog( database="yyz-tickets", table_name="tickets", transformation_ctx="S3bucket_node1" )
ソースからデータを抽出するには
  1. AWS Glue データカタログで定義されているソースからデータを抽出するには、ドキュメントを参照し GlueContext にあるメソッドを見つけます。これらの方法については、「GlueContext クラス」を参照してください。create_dynamic_frame.from_catalog メソッドを選択します。glueContext でこのメソッドを呼び出します。

  2. create_dynamic_frame.from_catalog に関するドキュメントで確認してください。このメソッドには、database および table_name パラメータが必要です。create_dynamic_frame.from_catalog に対し、必要なパラメータを指定します。

    AWS Glue データカタログは前提条件セクションで設定されており、これにはソースデータの場所と形式に関する情報が保存されています。スクリプトに情報を直接提供する必要はありません。

  3. オプション — ジョブのブックマークをサポートするには、このメソッドに transformation_ctx パラメータを提供します。ジョブのブックマークに関する説明は、「オプション — ジョブのブックマークを有効にする」のセクションで確認できます。

注記

データ抽出用の一般的なメソッド

create_dynamic_frame_from_catalog は、AWS Glue データカタログ内のテーブルへの接続に使用されます

ソースの構造と場所を記述する設定でジョブを直接提供する必要がある場合は、「create_dynamic_frame_from_options」メソッドを参照してください。create_dynamic_frame.from_catalog を使用する場合に比べ、データを説明する詳細なパラメータの指定が必要になります。

必要なパラメータを特定するには、format_options および connection_parameters に関する補足文書を参照してください。スクリプトに対しソースデータの形式に関する情報を指定する方法については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。スクリプトに対しソースデータの場所に関する情報を指定する方法については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

ストリーミングソースから情報を読み取っている場合は、create_data_frame_from_catalog または create_data_frame_from_options メソッドを介して、ソースの情報をジョブに提供します。これらのメソッドは Apache Spark DataFrames を返すことに注意してください。

リファレンスドキュメントでは create_dynamic_frame_from_catalog を使用している部分のために、ここで生成されるコードは create_dynamic_frame.from_catalog を呼び出しています。これらのメソッドは、最終的には同じコードを呼び出すもので、よりクリーンなコードを作成できるようにインクルードされています。これは、aws-glue-libs にある Python のラッパーのソースを表示して確認できます。

ステップ 4. AWS Glue を使用してデータを変換する

ETL プロセスで抽出したソースデータについては、その変更方法を規定する必要があります。この情報を指定するには、AWS Glue Studio ビジュアルエディタで [Transform] (変換) ノードを作成します。

このステップでは、希望する現在のフィールドの名前とタイプのマップを ApplyMapping メソッドに提供し、DynamicFrame を変換します。

以下の変換を実行します。

  • 4 つの location および provinceキーを削除します。

  • officer の名前を officer_name に変更します。

  • ticket_number および set_fine_amount の型を float に変更します。

create_dynamic_frame.from_catalog により、DynamicFrame オブジェクトが提供されます。DynamicFrame は、AWS Glue 内のデータセットを表します。AWSGlue での変換は、DynamicFrames を変更する処理です。

注記

DynamicFrame について

DynamicFrame とは抽象化の 1 つであり、データ内のエントリの名前とタイプに関する記述を、データセットに接続できるようにします。Apache Spark にも同様の抽象化が存在し、DataFrame と呼ばれています。DataFrames の説明については、「Spark SQL Guide」(Spark SQL ガイド) を参照してください。

DynamicFrames を使用すると、データセットのスキーマを動的に記述できます。例えば、あるデータセットに価格列が含まれており、一部のエントリは価格を文字列として保存し、他のエントリは価格を double 型で格納している場合を考えます。AWSGlue は、オンザフライでスキーマを計算し、各行の自己記述型レコードを作成します。

一貫性のないフィールド (価格など) は、そのフレーム用のスキーマ内で、型 (ChoiceType) を使用しながら明示的に表されます。一貫性のないフィールドに対応するには、DropFields を使用して削除するか、ResolveChoice を使用して解決します。これらは、DynamicFrame で使用可能な変換です。その後、writeDynamicFrame を使用して、データをデータレイクに再度書き込むことができます。

DynamicFrame クラスにあるメソッドからは、多くの同様な変換を呼び出すことができます。これにより、スクリプトが読みやすくなります。DynamicFrame の詳細については、「DynamicFrame クラス」を参照してください。

この手順では、ApplyMapping を使用して次のコードを記述します。このコードは、生成されるサンプルスクリプトの一部です。

ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("tag_number_masked", "string", "tag_number_masked", "string"), ("date_of_infraction", "string", "date_of_infraction", "string"), ("ticket_date", "string", "ticket_date", "string"), ("ticket_number", "decimal", "ticket_number", "float"), ("officer", "decimal", "officer_name", "decimal"), ("infraction_code", "decimal", "infraction_code", "decimal"), ("infraction_description", "string", "infraction_description", "string"), ("set_fine_amount", "decimal", "set_fine_amount", "float"), ("time_of_infraction", "decimal", "time_of_infraction", "decimal"), ], transformation_ctx="ApplyMapping_node2", )
AWS Glue でデータを変換するには
  1. フィールドを変更や削除するための変換を特定するには、ドキュメントを参照してください。詳細については、「GlueTransform 基本クラス」を参照してください。ApplyMapping 変換を選択します。ApplyMapping の詳細については、「ApplyMapping クラス」を参照してください。ApplyMapping 変換オブジェクトから apply を呼び出します。

    注記

    ApplyMapping とは

    ApplyMappingDynamicFrame にアクセスし、それを変換します。これにより、フィールドの変換を表すタプルのリスト、つまり「マッピング」を取得します。最初の 2 つのタプル要素 (フィールドの名前と型) は、フレーム内のフィールドを識別するために使用されます。その次にある 2 つのパラメータも、フィールドの名前と型です。

    ApplyMapping は、ソースフィールドをターゲット名に変換し、新しい DynamicFrame を入力して返します。提供されていないフィールドは戻り値から削除されます。

    apply を呼び出す代わりに、DynamicFrame にある apply_mapping メソッドを使用して同様の変換を呼び出すと、さらにスムーズで読みやすいコードを作成できます。詳細については、「apply_mapping」を参照してください。

  2. 必要なパラメータを特定するには、ApplyMapping のドキュメントを参照してください。「ApplyMapping クラス」を参照してください。このメソッドでは、frame および mappings パラメータが必要です。ApplyMapping に対し、必要なパラメータを指定します。

  3. オプション — ジョブのブックマークをサポートするには、メソッドに transformation_ctx を提供します。ジョブのブックマークに関する説明は、「オプション — ジョブのブックマークを有効にする」のセクションで確認できます。

注記

Apache Spark の機能

ジョブ内のETLワークフローを効率化するための変換機能が提供されています。また、より一般的な目的のために構築された、ジョブ内の Spark プログラムで使用できるライブラリにもアクセスできます。これらを使用するには、DynamicFrameDataFrame の間で変換します。

toDF を使用して DataFrame を作成できます。次に、DataFrame で使用可能なメソッドを使用してデータセットを変換できます。これらのメソッドの詳細については、「DataFrame」を参照してください。その後、フレームをターゲットにロードする AWS Glue オペレーションを使用するため、fromDF によって変換を元に戻すことができます。

Step 5. ターゲットにデータをロードする

通常、変換後のデータは、そのソースとは別の場所に保存します。この操作を実行するには、AWS Glue Studio ビジュアルエディタで、[Target] (ターゲット) ノードを作成します

このステップでは、データを Amazon S3 のターゲットバケットにロードするための write_dynamic_frame.from_options メソッド connection_typeconnection_optionsformatformat_options を提供します。

ステップ 1 において、GlueContext オブジェクトは初期化済みです。AWS Glue では、ソースと同様、ターゲットの設定に使用されるメソッドを提供しています。

この手順では、write_dynamic_frame.from_options を使用して次のコードを記述します。このコードは、生成されるサンプルスクリプトの一部です。

S3bucket_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="glueparquet", connection_options={"path": "s3://DOC-EXAMPLE-BUCKET", "partitionKeys": []}, format_options={"compression": "gzip"}, transformation_ctx="S3bucket_node3", )
データをターゲットにロードするには
  1. ターゲットの Amazon S3 バケットに対しデータをロードするメソッドは、ドキュメントで確認してください。これらの方法については、「GlueContext クラス」を参照してください。write_dynamic_frame_from_options メソッドを選択します。glueContext でこのメソッドを呼び出します。

    注記

    データロード用の一般的なメソッド

    write_dynamic_frame.from_options は、データのロードに使用される最も一般的なメソッドです。このメソッドでは、AWS Glueで使用可能なすべてのターゲットがサポートされています。

    AWS Glue 接続で定義された JDBC ターゲットに書き込む場合は、write_dynamic_frame_from_jdbc_conf メソッドを使用します。AWSGlue 接続に、データソースへの接続方法に関する情報が保存されます。これにより、connection_options でそれらの情報を入力する必要がなくなります。ただし、dbtable を提供するためには、引き続き connection_options を使用する必要があります。

    write_dynamic_frame.from_catalog は、データロード用として、一般的なメソッドではありません。このメソッドは基盤データセットを更新せずに AWS Glue データカタログを更新します。これは、基盤のデータセットを変更する他のプロセスと組み合わせて使用するメソッドです。詳細については、「AWS Glue ETL ジョブを使用してデータカタログのスキーマを更新し、新規パーティションを追加する」を参照してください。

  2. write_dynamic_frame_from_options に関するドキュメントで確認してください。このメソッドでは、frameconnection_typeformatconnection_optionsformat_options が必要です。glueContext でこのメソッドを呼び出します。

    1. 必要なパラメータを特定するには、format_options および format に関する補足文書を参照してください。データ形式の説明については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

    2. 必要なパラメータを特定するには、connection_type および connection_options に関する補足文書を参照してください。接続の説明については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

    3. write_dynamic_frame.from_options に対し、必要なパラメータを指定します。このメソッドの設定は、create_dynamic_frame.from_options と似ています。

  3. オプション — ジョブのブックマークをサポートするには、transformation_ctxwrite_dynamic_frame.from_options に提供します。ジョブのブックマークに関する説明は、「オプション — ジョブのブックマークを有効にする」のセクションで確認できます。

ステップ 6。Job オブジェクトのコミット

Job オブジェクトは、ステップ 1 で初期化してあります。スクリプトの終了時に、ライフサイクルを手動で閉じる必要があります。一部のオプション機能が正しく機能するために、これが必要となります。この作業は、AWS Glue Studioの背後で実行されます。

このステップでは、Job オブジェクトの commit メソッドを呼び出します。

この手順では、次のコードを作成します。このコードは、生成されるサンプルスクリプトの一部です。

job.commit()
Job オブジェクトをコミットするには
  1. まだ行っていない場合は、前のセクションで概説したオプションの手順を実行して transformation_ctx を含めます。

  2. commit を呼び出します。

オプション — ジョブのブックマークを有効にする

これまでのすべてのステップでは、transformation_ctx パラメータを設定することを促しています。これは、ジョブのブックマークと呼ばれる機能に関連した設定です。

ジョブのブックマークを使用すると、以前の作業を簡単に追跡できるデータセットに対して定期的に実行されるジョブによって、時間とコストを節約できます。ジョブのブックマークは、前回の実行のデータセット全体に関する AWS Glue 変換の進捗状況を追跡します。以前の実行が終了した場所を追跡することで、AWS Glue は未処理の行に対してのみ作業できます。ブックマークの詳細については、「ジョブのブックマークを使用した処理済みデータの追跡」を参照してください。

前の例で説明したように、ジョブのブックマークを有効にするには、始めに、提供されている関数に transformation_ctx ステートメントを追加します。ジョブのブックマークの状態は実行後も保持されます。transformation_ctx パラメータは、その状態にアクセスするために使用されるキーです。これらのステートメント単体では機能を持ちません。また、ジョブのための設定内で、その機能を有効にする必要があります。

この手順では、AWS Management Console を使用してジョブのブックマークを有効にします。

ジョブのブックマークを有効にするには
  1. 対象となるジョブの、[Job details] (ジョブの詳細) セクションに移動します。

  2. [Job bookmark] (ジョブのブックマーク) で [Enable] (有効化) 設定します。

ステップ 7。コードをジョブとして実行する

このステップでは、チュートリアルを正常に完了したことを確認するために、ジョブを実行します。これは、AWS Glue Studio ビジュアルエディタの中で、ボタンをクリックするだけで完了します。

コードをジョブとして実行するには
  1. タイトルバーで [Untitled job] (無題のジョブ) 選択し、ジョブ名を編集あるいは設定します。

  2. [Job details] (ジョブの詳細) タブを開きます。ジョブに [IAM Role] (IAM ロール) を割り当てます。AWS Glue Studio チュートリアルの前提条件にある、AWS CloudFormation テンプレートで作成したものを使用できます。このチュートリアルを完了していれば、通常は、AWS Glue StudioRole のように表示されています。

  3. [Save] (保存) を選択して、スクリプトを保存します。

  4. [Run] (実行) を選択して、ジョブを実行します。

  5. [Runs] (実行) タブに移動して、ジョブが完了したことを確認します。

  6. write_dynamic_frame.from_options のターゲットである、DOC-EXAMPLE-BUCKET に移動します。想定どおりに出力されていることを確認します。

ジョブの設定と管理の詳細については、「独自のカスタムスクリプトの提供」を参照してください。

詳細情報

Apache Spark のライブラリとメソッドを、AWS Glue スクリプトの中で使用することが可能です。Spark のドキュメントを参照することで、これらの付属ライブラリで何が行えるのか把握できます。詳細については、「examples section of the Spark source repository」(Spark ソースリポジトリのサンプルセクション) を参照してください。

AWS Glue 2.0+ には、デフォルトで、いくつかの一般的な Python ライブラリが含まれています。Scala または Python 環境の AWS Glue ジョブに独自の依存関係をロードするためのメカニズムもあります。Python の依存関係については、「AWS Glue での Python ライブラリの使用」を参照してください。

Python で AWS Glue の機能を使用する方法の例の詳細については、「AWS Glue Python コードサンプル」を参照してください。Scala と Python ジョブには同等の機能があるため、Python の例を基にして、同様の処理を Scala で行う方法についても考察することが可能です。