本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Apache Spark 預覽版的生成式 AI 升級可在下列 AWS 區域中供 AWS Glue 使用:美國東部 (俄亥俄)、美國東部 (維吉尼亞北部)、美國西部 (奧勒岡)、亞太區域 (東京) 和亞太區域 (雪梨)。預覽功能可能會有所變更。 |
Glue AWS 中的 Spark 升級可讓資料工程師和開發人員使用生成式 AI,將現有的 AWS Glue Spark 任務升級和遷移到最新的 Spark 版本。資料工程師可以使用它來掃描其 AWS Glue Spark 任務、產生升級計劃、執行計劃,以及驗證輸出。它透過自動化識別和更新 Spark 指令碼、組態、相依性、方法和功能等未區分的工作,來減少 Spark 升級的時間和成本。

運作方式
當您使用升級分析時, AWS Glue 會識別任務程式碼中版本和組態之間的差異,以產生升級計劃。升級計劃會詳細說明所有程式碼變更,以及必要的遷移步驟。接下來, AWS Glue 會在沙盒環境中建置並執行升級的應用程式,以驗證變更,並產生程式碼變更清單,供您遷移任務。您可以檢視更新的指令碼,以及詳細說明提議變更的摘要。執行您自己的測試後,接受變更,Glue AWS 任務會自動更新為最新版本,並搭配新的指令碼。
升級分析程序可能需要一些時間才能完成,具體取決於任務的複雜性和工作負載。升級分析的結果會存放在指定的 Amazon S3 路徑中,您可以檢閱這些路徑來了解升級和任何潛在的相容性問題。檢閱升級分析結果後,您可以決定是否繼續進行實際升級,或在升級之前對任務進行任何必要的變更。
先決條件
使用生成式 AI 升級 Glue AWS 中的任務時,需要下列先決條件:
-
AWS Glue 2 PySpark 任務 – 只有 AWS Glue 2 任務可以升級至 AWS Glue 4。
-
需要 IAM 許可才能開始分析、檢閱結果並升級您的任務。如需詳細資訊,請參閱下許可節中的範例。
-
如果使用 AWS KMS 加密分析成品或服務來加密用於分析的資料,則需要額外的 AWS KMS 許可。如需詳細資訊,請參閱下AWS KMS 政策節中的範例。
許可
-
使用下列許可更新呼叫者的 IAM 政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["glue:StartJobUpgradeAnalysis", "glue:StartJobRun", "glue:GetJobRun", "glue:GetJob", "glue:BatchStopJobRun" ], "Resource": [ "arn:aws:glue:us-east-1:123456789012:job/jobName" ] }, { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": [ "<s3 script location associated with the job>" ] }, { "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": [ "<result s3 path provided on API>" ] }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey", ], "Resource": "<key-arn-passed in the API>" } ] }
注意
如果您使用的是兩個不同的 AWS KMS 金鑰,一個用於結果成品加密,另一個用於服務中繼資料加密,則政策需要包含兩個金鑰的類似政策。
-
更新您要升級之任務的執行角色,以包含下列內嵌政策:
{ "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": [ "ARN of the Amazon S3 path provided on API", "ARN of the Amazon S3 path provided on API/*" ] }
例如,如果您使用的是 Amazon S3 路徑
s3://amzn-s3-demo-bucket/upgraded-result
,則政策為:{ "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/upgraded-result/", "arn:aws:s3:::amzn-s3-demo-bucket/upgraded-result/*" ] }
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["glue:GetJobUpgradeAnalysis"],
"Resource": [
"arn:aws:glue:us-east-1:123456789012:job/jobName"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["glue:StopJobUpgradeAnalysis",
"glue:BatchStopJobRun"
],
"Resource": [
"arn:aws:glue:us-east-1:123456789012:job/jobName"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["glue:ListJobUpgradeAnalyses"],
"Resource": [
"arn:aws:glue:us-east-1:123456789012:job/jobName"
]
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["glue:UpdateJob",
"glue:UpgradeJob"
],
"Resource": [
"arn:aws:glue:us-east-1:123456789012:job/jobName"
]
},
{
"Effect": "Allow",
"Action": ["iam:PassRole"],
"Resource": [
"<Role arn associated with the job>"
]
}
]
}
AWS KMS 政策
若要在開始分析時傳遞您自己的自訂 AWS KMS 金鑰,請參閱下一節,以設定 AWS KMS 金鑰的適當許可。
您需要許可 (加密/解密) 才能傳遞金鑰。在下面的政策範例中,<IAM Customer caller ARN>
允許 指定的 AWS 帳戶或角色執行允許的 動作:
-
kms:Decrypt 允許使用指定的 AWS KMS 金鑰進行解密。
-
kms:GenerateDataKey 允許使用指定的金鑰產生資料 AWS KMS 金鑰。
{
"Effect": "Allow",
"Principal":{
"AWS": "<IAM Customer caller ARN>"
},
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey",
],
"Resource": "<key-arn-passed-on-start-api>"
}
您需要授予許可給 AWS Glue,以使用 AWS KMS 金鑰進行金鑰的加密和解密。
{
"Effect": "Allow",
"Principal":{
"Service": "glue.amazonaws.com"
},
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey",
],
"Resource": "<key-arn>",
"Condition": {
"StringLike": {
"aws:SourceArn": "arn:aws:glue:<region>:<aws_account_id>:job/job-name"
}
}
}
此政策可確保您同時擁有 AWS KMS 金鑰的加密和解密許可。
{
"Effect": "Allow",
"Principal":{
"AWS": "<IAM Customer caller ARN>"
},
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey",
],
"Resource": "<key-arn-passed-on-start-api>"
}
執行升級分析並套用升級指令碼
您可以執行升級分析,這會針對您從任務檢視中選取的任務產生升級計劃。
-
在任務中,選取 AWS Glue 2.0 任務,然後從動作功能表中選擇執行升級分析。
-
在模態中,選取路徑,將產生的升級計畫存放在結果路徑中。這必須是您可以存取和寫入的 Amazon S3 儲存貯體。
-
視需要設定其他選項:
-
執行組態 – 選用:執行組態是一種選用設定,可讓您自訂在升級分析期間執行的驗證執行的各個層面。此組態用來執行升級的指令碼,並可讓您選取運算環境屬性 (工作者類型、工作者數量等)。請注意,您應該在檢閱、接受變更並將其套用至生產環境之前,使用您的非生產開發人員帳戶,對範例資料集執行驗證。執行組態包含下列可自訂的參數:
-
工作者類型:您可以指定要用於驗證執行的工作者類型,讓您可以根據您的需求選擇適當的運算資源。
-
工作者數量:您可以定義要佈建以進行驗證執行的工作者數量,讓您可以根據工作負載需求擴展資源。
-
任務逾時 (以分鐘為單位):此參數可讓您設定驗證執行的時間限制,確保任務在指定的持續時間之後終止,以防止資源消耗過多。
-
安全組態:您可以設定安全設定,例如加密和存取控制,以確保在驗證執行期間保護您的資料和資源。
-
其他任務參數:如有需要,您可以新增任務參數,進一步自訂驗證執行的執行環境。
透過利用執行組態,您可以量身打造驗證執行,以符合您的特定需求。例如,您可以設定驗證執行以使用較小的資料集,讓分析更快速地完成並最佳化成本。此方法可確保有效執行升級分析,同時將驗證階段的資源使用率和相關成本降至最低。
-
-
加密組態 – 選用:
-
啟用升級成品加密:在將資料寫入結果路徑時啟用靜態加密。如果您不想加密升級成品,請將此選項保留為未選取狀態。
-
自訂服務中繼資料加密:您的服務中繼資料預設為使用 AWS 擁有的金鑰加密。如果您想要使用自己的金鑰進行加密,請選擇此選項。
-
-
-
選擇執行以開始升級分析。分析執行時,您可以在升級分析索引標籤上檢視結果。分析詳細資訊視窗會顯示分析的相關資訊,以及升級成品的連結。
-
結果路徑 – 這是存放結果摘要和升級指令碼的位置。
-
Amazon S3 中的升級指令碼 – Amazon S3 中升級指令碼的位置。您可以在套用升級之前檢視指令碼。
-
Amazon S3 中的升級摘要 – Amazon S3 中升級摘要的位置。您可以在套用升級之前檢視升級摘要。
-
-
當升級分析成功完成時,您可以選擇套用升級指令碼,以套用升級指令碼來自動升級任務。
套用後,Glue AWS 版本將更新為 4.0。您可以在指令碼索引標籤中檢視新的指令碼。
了解您的升級摘要
此範例示範將 AWS Glue 任務從 2.0 版升級至 4.0 版的程序。範例任務會從 Amazon S3 儲存貯體讀取產品資料,使用 Spark SQL 將數個轉換套用至資料,然後將轉換後的結果儲存回 Amazon S3 儲存貯體。
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.job import Job
import json
from pyspark.sql.types import StructType
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
gdc_database = "s3://aws-glue-scripts-us-east-1-gamma/demo-database/"
schema_location = (
"s3://aws-glue-scripts-us-east-1-gamma/DataFiles/"
)
products_schema_string = spark.read.text(
f"{schema_location}schemas/products_schema"
).first()[0]
product_schema = StructType.fromJson(json.loads(products_schema_string))
products_source_df = (
spark.read.option("header", "true")
.schema(product_schema)
.option(
"path",
f"{gdc_database}products/",
)
.csv(f"{gdc_database}products/")
)
products_source_df.show()
products_temp_view_name = "spark_upgrade_demo_product_view"
products_source_df.createOrReplaceTempView(products_temp_view_name)
query = f"select {products_temp_view_name}.*, format_string('%0$s-%0$s', category, subcategory) as unique_category from {products_temp_view_name}"
products_with_combination_df = spark.sql(query)
products_with_combination_df.show()
products_with_combination_df.createOrReplaceTempView(products_temp_view_name)
product_df_attribution = spark.sql(
f"""
SELECT *,
unbase64(split(product_name, ' ')[0]) as product_name_decoded,
unbase64(split(unique_category, '-')[1]) as subcategory_decoded
FROM {products_temp_view_name}
"""
)
product_df_attribution.show()
product_df_attribution.write.mode("overwrite").option("header", "true").option(
"path", f"{gdc_database}spark_upgrade_demo_product_agg/"
).saveAsTable("spark_upgrade_demo_product_agg", external=True)
spark_upgrade_demo_product_agg_table_df = spark.sql(
f"SHOW TABLE EXTENDED in default like 'spark_upgrade_demo_product_agg'"
)
spark_upgrade_demo_product_agg_table_df.show()
job.commit()
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.job import Job
import json
from pyspark.sql.types import StructType
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# change 1
spark.conf.set("spark.sql.adaptive.enabled", "false")
# change 2
spark.conf.set("spark.sql.legacy.pathOptionBehavior.enabled", "true")
job = Job(glueContext)
gdc_database = "s3://aws-glue-scripts-us-east-1-gamma/demo-database/"
schema_location = (
"s3://aws-glue-scripts-us-east-1-gamma/DataFiles/"
)
products_schema_string = spark.read.text(
f"{schema_location}schemas/products_schema"
).first()[0]
product_schema = StructType.fromJson(json.loads(products_schema_string))
products_source_df = (
spark.read.option("header", "true")
.schema(product_schema)
.option(
"path",
f"{gdc_database}products/",
)
.csv(f"{gdc_database}products/")
)
products_source_df.show()
products_temp_view_name = "spark_upgrade_demo_product_view"
products_source_df.createOrReplaceTempView(products_temp_view_name)
# change 3
query = f"select {products_temp_view_name}.*, format_string('%1$s-%1$s', category, subcategory) as unique_category from {products_temp_view_name}"
products_with_combination_df = spark.sql(query)
products_with_combination_df.show()
products_with_combination_df.createOrReplaceTempView(products_temp_view_name)
# change 4
product_df_attribution = spark.sql(
f"""
SELECT *,
try_to_binary(split(product_name, ' ')[0], 'base64') as product_name_decoded,
try_to_binary(split(unique_category, '-')[1], 'base64') as subcategory_decoded
FROM {products_temp_view_name}
"""
)
product_df_attribution.show()
product_df_attribution.write.mode("overwrite").option("header", "true").option(
"path", f"{gdc_database}spark_upgrade_demo_product_agg/"
).saveAsTable("spark_upgrade_demo_product_agg", external=True)
spark_upgrade_demo_product_agg_table_df = spark.sql(
f"SHOW TABLE EXTENDED in default like 'spark_upgrade_demo_product_agg'"
)
spark_upgrade_demo_product_agg_table_df.show()
job.commit()

根據摘要,Glue AWS 提議了四種變更,以便成功將指令碼從 AWS Glue 2.0 升級到 AWS Glue 4.0:
-
Spark SQL 組態 (spark.sql.adaptive.enabled):此變更是還原應用程式行為,作為從 Spark 3.2 開始推出 Spark SQL 適應性查詢執行的新功能。您可以檢查此組態變更,並根據其偏好設定進一步啟用或停用它。
-
DataFrame API 變更:路徑選項無法與其他 DataFrameReader 操作共存,例如
load()
。若要保留先前的行為, AWS Glue 已更新指令碼以新增新的 SQL 組態 (spark.sql.legacy.pathOptionBehavior.enabled)。 -
Spark SQL API 變更:
strfmt
中 的行為format_string(strfmt, obj, ...)
已更新為不允許0$
做為第一個引數。為了確保相容性, AWS Glue 已改為修改指令碼以1$
用作第一個引數。 -
Spark SQL API 變更:
unbase64
函數不允許格式不正確的字串輸入。若要保留先前的行為, AWS Glue 已更新指令碼以使用try_to_binary
函數。
停止進行中的升級分析
您可以取消進行中的升級分析,或直接停止分析。
-
選擇升級分析索引標籤。
-
選取正在執行的任務,然後選擇停止。這將停止分析。然後,您可以在相同的任務上執行另一個升級分析。
考量事項
當您在預覽期間開始使用 Spark 升級時,需要考慮幾個重要層面,以獲得最佳的服務使用。
-
服務範圍和限制:預覽版著重於從 AWS Glue 2.0 版升級至 4.0 版的 PySpark 程式碼。目前,服務會處理不依賴其他程式庫相依性的 PySpark 程式碼。您可以在 AWS 帳戶中同時執行最多 10 個任務的自動升級,讓您有效率地升級多個任務,同時維持系統穩定性。
-
僅支援 PySpark 任務。
-
升級分析會在 24 小時後逾時。
-
一個任務一次只能執行一個作用中的升級分析。在帳戶層級上,最多可以同時執行 10 個作用中的升級分析。
-
-
在升級程序期間最佳化成本:由於 Spark 升級使用生成式 AI 透過多次反覆運算來驗證升級計畫,因此在帳戶中以 Glue AWS 任務身分執行每個反覆運算,因此最佳化驗證任務執行組態以提高成本效益至關重要。若要達成此目的,我們建議您在啟動升級分析時指定執行組態,如下所示:
-
使用非生產開發人員帳戶,並選取代表生產資料,但大小較小的範例模擬資料集,以透過 Spark 升級進行驗證。
-
使用適當大小的運算資源,例如 G.1X 工作者,以及選擇適當數量的工作者來處理範例資料。
-
適用時啟用 AWS Glue 任務自動擴展,以根據工作負載自動調整資源。
例如,如果您的生產任務使用 20 G.2X 工作者處理 TB 的資料,您可以設定升級任務以處理數 GB 的代表性資料,其中 2 G.2X 工作者和啟用自動擴展以進行驗證。
-
-
預覽最佳實務:在預覽期間,強烈建議您開始使用非生產任務進行升級。此方法可讓您熟悉升級工作流程,並了解服務如何處理不同類型的 Spark 程式碼模式。
-
警示和通知:在任務上使用生成式 AI 升級功能時,請確保關閉失敗任務執行的警示/通知。在升級過程中,在提供升級成品之前,您的帳戶中最多可能會有 10 個失敗的任務執行。
-
異常偵測規則:關閉正在升級的任務上的任何異常偵測規則,因為在升級驗證進行期間,在中繼任務執行期間寫入輸出資料夾的資料可能不是預期的格式。
Spark 升級中的跨區域推論
Spark 升級是由 提供支援, Amazon Bedrock 並利用跨區域推論 (CRIS)。透過 CRIS,Spark 升級會自動選取您地理位置內的最佳區域 (如此處更詳細描述),以處理您的推論請求、最大化可用的運算資源和模型可用性,並提供最佳的客戶體驗。使用跨區域推論無需額外費用。
跨區域推論請求會保留在原始資料所在地理位置的 AWS 區域中。例如,在美國提出的請求會保留在美國的 AWS 區域內。雖然資料只會存放在主要區域,但使用跨區域推論時,您的輸入提示和輸出結果可能會移動到主要區域之外。所有資料將透過 Amazon 安全網路加密傳輸。