翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon EMR Serverless を使用して Spark ジョブを実行する
ログ処理、特徴量エンジニアリング、複雑な ETL、または科学分析のために Spark ワークロードを実行するデータエンジニアリングチームには、多くの場合、オンプレミスの取り込みパイプライン、NFS または SMB データムーバー、またはボリュームを直接マウントするアプリケーションによって書き込まれた FSx for ONTAP ボリュームのソースデータがあります。
Amazon S3 アクセスポイントをボリュームにアタッチすると、Amazon EMR Serverless はアクセスポイントを介してデータを読み取り、それに対して Spark ジョブを実行し、結果を同じボリュームに書き込みます。Amazon EMR Serverless はクラスターのライフサイクルを自動的に処理します。ジョブを送信し、実行した秒数に対して支払います。
このパターンは、Spark ランタイム全体 (カスタムライブラリ、反復アルゴリズム、長時間実行される変換、Amazon EMR Studio 経由のインタラクティブノートブック) を必要とするワークロードに適しています。Amazon Athena for SQL と AWS Glue マネージド ETL の軽量オプションは適切ではありません。これらの代替方法の詳細については、Amazon Athena を使用して SQL でファイルをクエリする「」および「」を参照してくださいを使用して ETL パイプラインを構築する AWS Glue。
このチュートリアルでは、FSx for ONTAP ボリュームでステージングされた 1 年間の NOAA Global Surface Summary of the Day (GSOD) 観測値を集計する気象チームをシミュレートします。未加工の CSV ファイルを読み取り、毎月のステーションごとの集計 (平均温度、合計降水量、降水イベントの発生日数) を計算し、結果を月単位でパーティション分割された Parquet として書き込む PySpark ジョブをアクセスポイント経由で送信します。
注記
このチュートリアルの所要時間は約 30~40 分です。 AWS のサービス 使用する には、作成したリソースの料金が発生します。クリーンアップセクションを含むすべてのステップをすぐに完了すると、米国東部 (バージニア北部) の予想コストは 1 USD 未満になります。 AWS リージョンこの見積もりには、FSx for ONTAP ボリューム自体の継続的な料金は含まれません。
前提条件
Amazon S3 アクセスポイントがアタッチされた FSx for ONTAP ボリューム。 Amazon S3 Amazon EMR Serverless サービスがアクセスポイントに到達できるようにするには、アクセスポイントにインターネットネットワークオリジンが必要です。手順については、「アクセスポイントの作成」を参照してください。
AWS CLI バージョン 2 は、IAM ロールと Amazon EMR Serverless リソースを作成できる認証情報でインストールおよび設定されています。
ステップ 1: サンプルデータセットをアクセスポイントにアップロードする
NOAA GSOD データセットは、毎日の気象観測のパブリックデータセットであり、ステーションごとに年間 1 つの CSV ファイルです。このチュートリアルでは、パブリック noaa-gsod-pds Amazon S3 バケットから 100 ステーションのサブセットをダウンロードし、アクセスポイントにアップロードします。
-
2024 年の最初の 100 個のステーションファイルをダウンロードします。
$mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -lコマンドは、合計約 7~8 MB の CSV ファイル約 100 個をダウンロードします。
-
gsod/2024/プレフィックスの下にあるアクセスポイントにファイルをアップロードします。access-point-aliasをアクセスポイントエイリアスに置き換えます。$aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
ステップ 2: PySpark ジョブを記述する
ジョブは、入力プレフィックスの下ですべての CSV ファイルを読み取り、欠落データを表すセンチネル値をフィルタリングし、FRSHTTビットフィールド (Fog、Rain、Snow、Hail、Thunder、Tornado) を解析して降水イベントの日数をカウントし、 ごとに集計して(station, month)、パーティション化された Parquet をアクセスポイントに書き込みます。
-
次のスクリプトを という名前のファイルに保存します
gsod_monthly.py。# gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop() -
scripts/プレフィックスの下にあるアクセスポイントにスクリプトをアップロードします。$aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"
ステップ 3: Amazon EMR Serverless ジョブロールを作成する
Amazon EMR Serverless は、ジョブの実行時に IAM 実行ロールを引き受けます。ロールには、アクセスポイントの読み取りと書き込み、および CloudWatch Logs へのログの書き込みのためのアクセス許可が必要です。セットアップ手順については、次のセクションを展開します。
-
次の信頼ポリシーを として保存します
emr-trust-policy.json。これにより、Amazon EMR Serverless がロールを引き受けることができます。{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] } -
次のアクセス許可ポリシーを として保存します
emr-permissions.json。region、account-id、およびaccess-point-nameを自分の値に置き換えます。{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] } -
ロールを作成し、ポリシーをアタッチします。
$aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json
ステップ 4: Amazon EMR Serverless アプリケーションを作成して起動する
Amazon EMR Serverless アプリケーションは、特定のリリースラベルとエンジン (Spark または Hive) の存続期間の長いコンピューティング環境です。1 つ以上のジョブを送信します。アプリケーションは、ジョブの需要に基づいて自動的にコンピューティングをスケールアップ/ダウンし、ジョブが実行されていない場合はアイドルアウトします。
-
最新の Amazon EMR リリースを使用して Spark アプリケーションを作成します。
$aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0レスポンス内の
applicationIdを書き留めます。 -
アプリケーションの起動 を開始すると、ワーカーの小さなプールが事前ウォーミングされるため、最初のジョブはコールドスタートの遅延なしで実行されます。
$aws emr-serverless start-application --application-idapplication-id状態が になるまで待ちます
STARTED。$aws emr-serverless get-application --application-idapplication-id\ --query 'application.state'
ステップ 5: Spark ジョブを送信する
アプリケーション ID と実行ロールを使用してジョブを送信します。ジョブはgsod/2024/、アクセスポイントを介して、未加工の CSVs を から読み取りgsod-monthly/、パーティション化された Parquet を に書き込みます。
-
ジョブドライバー設定を として保存します
job-driver.json。プレースホルダーを置き換えます。{ "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } } -
次のモニタリング設定を として保存します
job-config.json。ドライバーとエグゼキュターのログを CloudWatch Logs に送信します。{ "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } } -
ジョブを送信します。
$aws emr-serverless start-job-run \ --application-idapplication-id\ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.jsonレスポンス内の
jobRunIdを書き留めます。 -
ジョブのステータスをポーリングします。ジョブは から
SCHEDULEDRUNNINGに移行しますSUCCESS。$aws emr-serverless get-job-run \ --application-idapplication-id\ --job-run-idjob-run-id\ --query 'jobRun.state'
注記
ジョブが失敗した場合、ロググループ の CloudWatch Logs でドライバーログを確認します/aws/emr-serverless/fsxn-emr-app。Amazon EMR Serverless は、ジョブの実行ごとに 1 つのログストリームを書き込みます。
ステップ 6: 出力を検査する
ジョブが 1 か月あたり 1 つの Parquet パーティションを書き込み、出力が読み取り可能であることを確認します。
-
出力パーティションを一覧表示します。
$aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursivemonth=YYYY-MM/パーティションごとに 1 つの Parquet ファイルと、ルートに_SUCCESSマーカーが表示されます。 -
パーティションをローカルに読み取り、コンテンツを確認します。
$aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"出力スキーマには、
station、、station_name、lat、lon、avg_temp_f、min_temp_f、、precip_event_days、max_temp_ftotal_prcp_inおよび が含まれますobservation_days。
パターンの拡張
Spark SQL を使用して出力をクエリします。パーティション化された出力を テーブルとして に登録 AWS Glue Data Catalog し、Spark SQL、Athena、または AWS Glue カタログテーブルを読み取るその他のツールを使用してクエリを実行します。アクセスポイントベースのデータセットを登録する手順については、「」を参照してくださいAmazon Athena を使用して SQL でファイルをクエリする。
ACID 書き込みには Iceberg を使用します。データを更新またはマージするワークロードの場合、プレーン Parquet ではなくアクセスポイントの Iceberg テーブルに書き込むようにジョブを設定します。Amazon EMR Serverless には、最近のリリースラベルにデフォルトで Iceberg ランタイムが含まれています。
Amazon EMR Studio でインタラクティブに を実行します。Jupyter ノートブックを Amazon EMR Serverless アプリケーションにアタッチして、データをインタラクティブに探索します。「Amazon EMR Serverless ユーザーガイド」の「Amazon EMR Serverless を使用したインタラクティブワークロード」を参照してください。
ジョブをスケジュールします。Amazon EventBridge スケジューラまたは AWS Step Functions を使用して、定期的なスケジュールでジョブを実行します (たとえば、新しい日のデータ量がボリュームに加算された場合など)。
トラブルシューティング
- アクセスポイント
AccessDeniedの でジョブが失敗する ジョブロールポリシーがアクセスポイント ARN
s3:ListBucketでs3:GetObjectと を許可し (バケットではなく)、アクセスポイントにインターネットネットワークオリジンがあり、Amazon EMR Serverless サービスがそれに到達できることを確認します。- ジョブは成功したが、出力が空
入力パスを確認します。Amazon S3 はプレフィックスをリテラルに
ListObjectsV2扱うため、s3://alias/gsod/2024(末尾のスラッシュなし) とs3://alias/gsod/2024/(末尾のスラッシュ) の動作が異なる場合があります。ファイルのディレクトリを指すときは、末尾のスラッシュを含めます。- ドライバーログが CloudWatch Logs にない
モニタリング設定は
start-job-run、アプリケーションではなく--configuration-overridesで渡す必要があります。各ジョブ実行は、設定されたロググループの独自のログストリームに書き込みます。
クリーンアップ
アプリケーションの停止と削除、IAM ロールの削除、不要になったアップロード済みデータの削除を行います。
$aws emr-serverless stop-application --application-idapplication-idaws emr-serverless delete-application --application-idapplication-idaws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive