AWS Glueは、分析を行うユーザーが複数のソースからのデータを簡単に検出、準備、移動、統合できるようにするサーバーレスのデータ統合サービスです。AWS Glue ジョブを使用して抽出、変換、ロード (ETL) パイプラインを実行し、データをデータレイクにロードできます。AWS Glue の詳細については、AWS Glue デベロッパーガイド の「What is AWS Glue?」を参照してください。
AWS Glue ジョブには、ソースデータに接続して処理し、データターゲットに書き出すスクリプトがカプセル化されています。通常、ジョブは、抽出、変換、ロード (ETL) スクリプトを実行します。ジョブは、Apache Spark および Ray ランタイム環境用に設計されたスクリプト、または汎用 Python スクリプト (Python シェルジョブ) を実行できます。ジョブ実行をモニタリングすると、完了ステータス、継続時間、開始時間などのランタイムメトリクスを知ることができます。
Amazon S3 Tables Catalog for Apache Iceberg クライアントカタログ JAR を使用してテーブルに直接接続することで、AWS Glue ジョブを使用して S3 テーブルのデータを処理できます。
注記
S3 Tables は、AWS Glue バージョン 5.0 以降でサポートされています。
S3 Tables ETL AWS Glue ジョブの前提条件
AWS Glue ジョブからテーブルに対してクエリを実行する前に、AWS Glue がジョブの実行に使用できる IAM ロールを設定し、ジョブの実行時に AWS Glue がアクセスできる Amazon S3 バケットに Amazon S3 Tables Catalog for Apache Iceberg JAR をアップロードする必要があります。
-
AmazonS3TablesFullAccess
管理ポリシーをロールにアタッチします。AmazonS3FullAccess
管理ポリシーをロールにアタッチします。
Maven から最新バージョンの Amazon S3 Tables Catalog for Apache Iceberg クライアントカタログ JAR をダウンロードし、Amazon S3 バケットにアップロードします。
Maven Central
で最新バージョンを検索します。ブラウザまたは次のコマンドを使用して、Maven central から JAR をダウンロードできます。必ず バージョン番号
を最新バージョンに置き換えてください。wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/
0.1.4
/s3-tables-catalog-for-iceberg-runtime-0.1.4
.jarダウンロードした JAR を、AWS Glue IAM ロールがアクセスできる S3 バケットにアップロードします。JAR をアップロードするには、次の AWS CLI コマンドを実行します。
バージョン番号
を最新バージョンに置き換え、バケット名とパス
を独自のバージョンに置き換えます。aws s3 cp s3-tables-catalog-for-iceberg-runtime-
0.1.4
.jar s3://amzn-s3-demo-bucket1
/jars/
S3 テーブルをクエリする AWS Glue ETL PySpark スクリプトを作成する
AWS Glue ETL ジョブの実行時にテーブルデータにアクセスするには、PySpark スクリプトを使用して、ジョブの実行時に S3 テーブルバケットに接続する Apache Iceberg の Spark セッションを設定します。既存のスクリプトを変更してテーブルバケットに接続するか、新しいスクリプトを作成できます。AWS Glue スクリプトの作成の詳細については、「AWS Glue デベロッパーガイド」の「チュートリアル: AWS Glue for Spark スクリプトの記述」を参照してください。
PySpark スクリプトで次のコードスニペットを使用して、テーブルバケットへの Spark の接続を設定します。プレースホルダー値
を独自のテーブルバケットの情報に置き換えます。
# Configure Spark session for Iceberg spark_conf = SparkSession.builder.appName("GlueJob") .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:
REGION
:111122223333
:bucket/amzn-s3-demo-table-bucket
") .config("spark.sql.defaultCatalog", "s3tablesbucket") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.s3tablesbucket.cache-enabled", "false")
サンプルスクリプト
以下は、AWS Glue ジョブで S3 テーブルのクエリをテストするために使用できるサンプル PySpark スクリプトです。スクリプトはテーブルバケットに接続し、新しい名前空間の作成、サンプルテーブルの作成、テーブルへのデータの挿入、テーブルデータへのクエリを実行します。スクリプトを使用するには、プレースホルダー値
を独自のテーブルバケットの情報に置き換えます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import SparkSession # Configure Spark session for Iceberg spark_conf = SparkSession.builder.appName("GlueJob") .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:
REGION
:111122223333
:bucket/amzn-s3-demo-table-bucket
") .config("spark.sql.defaultCatalog", "s3tablesbucket") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.s3tablesbucket.cache-enabled", "false") # Initialize Glue context with custom Spark configuration sc = SparkContext.getOrCreate(conf=spark_conf.getOrCreate().sparkContext.getConf()) glueContext = GlueContext(sc) spark = glueContext.spark_session ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) job = Job(glueContext) job.init(args['JOB_NAME'], args) namespace = "new_namespace
" table = "new_table
" def run_sql(query): try: result = spark.sql(query) result.show() return result except Exception as e: print(f"Error executing query '{query}': {str(e)}") return None def main(): try: # Create a new namespace if it doesn't exist print("CREATE NAMESPACE") run_sql(f"CREATE NAMESPACE IF NOT EXISTS {namespace}") # Show all namespaces print("SHOW NAMESPACES") run_sql("SHOW NAMESPACES") # Describe a specific namespace print("DESCRIBE NAMESPACE") run_sql(f"DESCRIBE NAMESPACE {namespace}") # Create table in the namespace print("CREATE TABLE") create_table_query = f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """ run_sql(create_table_query) # Insert data into table print("INSERT INTO") insert_query = f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """ run_sql(insert_query) # Show tables in the namespace print("SHOW TABLES") run_sql(f"SHOW TABLES IN {namespace}") # Select all from a specific table print("SELECT FROM TABLE") run_sql(f"SELECT * FROM {namespace}.{table} LIMIT 20") except Exception as e: print(f"An error occurred in main execution: {str(e)}") raise # Re-raise the exception for Glue to handle finally: job.commit() if __name__ == "__main__": try: main() except Exception as e: print(f"Job failed with error: {str(e)}") sys.exit(1)
S3 テーブルをクエリする AWS Glue ETL ジョブを作成する
S3 テーブルの AWS Glue ジョブを設定するときは、ジョブがテーブルに直接クエリできるように、追加の依存関係として Amazon S3 Tables Catalog for Apache Iceberg JAR を含めます。次の手順では、AWS CLI を使用して、または AWS Glue Studio でコンソールを使用して、これを行う方法を示します。AWS Glue Studio は、ビジュアルインターフェイス、インタラクティブコードノートブック、またはスクリプトエディタを使用してジョブを作成します。詳細については、「AWS Glue ユーザーガイド」の「AWS Glue でジョブを作成する」を参照してください。
次の手順は、AWS Glue Studio スクリプトエディタを使用して S3 テーブルをクエリする ETL ジョブを作成する方法を示しています。
https://console.aws.amazon.com/glue/
で AWS Glue コンソール を開きます。 ナビゲーションペインで、[ETL ジョブ] を選択します。
[スクリプトエディタ] を選択し、[スクリプトのアップロード] を選択し、S3 テーブルをクエリするために作成した PySpark スクリプトをアップロードします。
[ジョブの詳細] タブを選択し、[基本プロパティ] に次のように入力します。
[名前] で、ジョブの名前を入力します。
[IAM ロール] で、作成した AWS Glue ロールを選択します。
[詳細プロパティ] を展開し、[依存 JARS パス] に、前提条件として S3 バケットにアップロードしたクライアントカタログ jar の S3 URI を入力します。例えば、s3://
amzn-s3-demo-bucket1
/jars
/s3-tables-catalog-for-iceberg-runtime-0.1.4
.jar[保存] を選択してジョブを作成します。
[実行] を選択してジョブを開始し、[実行] タブでジョブのステータスを確認します。
次の手順は、AWS CLI を使用して S3 テーブルをクエリする ETL ジョブを作成する方法を示しています。コマンドを使用するには、プレースホルダー値
を独自の値に置き換えます。
前提条件
S3 テーブルをクエリする AWS Glue ETL PySpark スクリプトを作成する、これを S3 バケットにアップロードします。
Glue ジョブを作成します。
aws glue create-job \ --name
etl-tables-job
\ --role arn:aws:iam::111122223333
:role/AWSGlueServiceRole
\ --command '{ "Name": "glueetl", "ScriptLocation": "s3://amzn-s3-demo-bucket1
/scripts/glue-etl-query.py
", "PythonVersion": "3" }' \ --default-arguments '{ "--job-language": "python", "--class": "GlueApp", "--extra-jars": "s3://amzn-s3-demo-bucket1
/jars/
s3-tables-catalog-for-iceberg-runtime-0.1.4
.jar" }' \ --glue-version "5.0"ジョブを再開してください。
aws glue start-job-run \ --job-name
etl-tables-job
ジョブのステータスを確認するには、前のコマンドの実行 ID をコピーして、次のコマンドに入力します。
aws glue get-job-run --job-name
etl-tables-job
\ --run-idjr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad