使用 Amazon SageMaker 功能儲存 Spark 進行 Batch 擷取 - Amazon SageMaker AI

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon SageMaker 功能儲存 Spark 進行 Batch 擷取

Amazon SageMaker 功能儲存 Spark 是 Spark 資料庫連接到功能儲存的 Spark 連接器。特徵商店 Spark 簡化了從 Spark DataFrame 到特徵群組的資料擷取作業。Feature Store 使用您現有的 ETL 管道,在 Amazon EMR、GIS、 AWS Glue 任務、Amazon SageMaker 處理任務或 SageMaker 筆記本上支援使用 Spark 的批次資料擷取。

為 Python 和 Scala 開發人員提供了用於安裝和實施批次資料擷取的方法。Python 開發人員可以使用開放原始碼 sagemaker-feature-store-pyspark Python 程式庫進行本機開發、安裝在Amazon EMR 上,以及 Jupyter 筆記本,方法是遵循 Amazon SageMaker Feature Store Spark GitHub 儲存庫中的指示。斯卡拉開發人員可以使用功能儲存Spark連接器可在 Amazon SageMaker Feature Store Spark SDK Maven central 儲存庫

您可以使用 Spark 連接器以下列方式擷取資料,視線上儲存、離線儲存或兩者是否啟用而定。

  1. 預設情況下擷取 — 如果啟用了線上儲存,Spark 連接器會先使用 PutRecord API 將您的資料框擷取至線上儲存。只有活動時間最長的記錄保留在線儲存中。如果已啟用離線存放區,則功能儲存會在 15 分鐘內將您的資料框擷取至離線存放區。如需線上和離線儲存運作方式的詳細資訊,請參閱功能儲存概念

    您可以通過不在.ingest_data(...)方法中指定target_stores來完成此操作。

  2. 離線儲存區直接擷取 — 如果啟用離線存放區,Spark 連接器批次會將您的資料框直接擷取至離線存放區。將資料框直接導入離線儲存並不會更新線上儲存。

    您可以通過在.ingest_data(...)方法中設置 target_stores=["OfflineStore"] 來完成此操作。

  3. 僅限線上儲存 — 如果啟用了線上儲存,Spark 連接器會使用 PutRecord API 將您的資料框擷取至線上儲存。將資料框直接導入線上儲存並不會更新離線儲存。

    您可以通過在.ingest_data(...)方法中設置 target_stores=["OnlineStore"] 來完成此操作。

如需不同擷取方法的詳細資訊,請參閱實作範例

功能儲存 Spark 安裝

Scala 使用者

特徵商店 Spark SDK 開放提供 Scala 使用者運用,請前往所在位置 Amazon SageMaker Feature Store Spark SDK Maven central 儲存庫

需求

  • Spark >= 3.0.0 和 <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR > = 6.1.0 (僅當您使用的是 Amazon EMR)

在 POM.xml 中聲明相依性關係

特徵商店 Spark 連接器具有 iceberg-spark-runtime 程式庫的相依性。因此,如果您要將資料擷取到已使用 Iceberg 資料表格式自動建立的功能群組中,則必須將相應版本的 iceberg-spark-runtime程式庫新增至相依性。例如,如果您使用的是 Spark 3.1,則必須在您的項目中聲明以下內容POM.xml

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Python 使用者

特徵商店 Spark SDK 可在開放原始碼的 Amazon SageMaker Feature Store Spark GitHub 儲存庫中使用。

需求

  • Spark >= 3.0.0 和 <= 3.3.0

  • Amazon EMR > = 6.1.0 (僅當您使用的是 Amazon EMR)

  • 核心 = conda_python3

我們建議將 $SPARK_HOME 設定為 Spark 安裝目錄。在安裝期間,特徵商店會將所需的 JAR 上傳至 SPARK_HOME,以便自動載入相依性。Spark啟動一個 JVM 需要使這個 PySpark 庫的工作。

本機安裝

若要尋找有關安裝的詳細資訊,請在以下安裝中新增 --verbose 以啟用詳細模式。

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

在 Amazon EMR 上安裝

使用 6.1.0 或更新版本建立 Amazon EMR 叢集。啟用 SSH 以協助您疑難排解任何問題。

您可以執行下列操作之一來安裝該資料庫:

  • 在 Amazon EMR 中建立自訂步驟。

  • 使用 SSH Connect 至叢集,然後從該處安裝資料庫。

注意

下列資訊使用 Spark 3.1 版,但您可以指定符合需求的任何版本。

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
注意

如果您想要將相依 JAR 自動安裝到 SPARK_HOME,請勿使用啟動程序步驟。

在 SageMaker 筆記本執行個體上安裝

使用下列指令安裝與 Spark 連接器相容的 PySpark 版本:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

如果您要對離線存放區執行批次擷取,則相依性不在筆記本執行個體環境中。

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

使用 GIS 在筆記本上安裝

重要

您必須使用 2 AWS Glue .0 版或更新版本。

使用下列資訊以協助您在 AWS Glue 互動式工作階段 (GIS) 中安裝 PySpark 連接器。

Amazon SageMaker 功能存放區 Spark 需要在工作階段初始化期間使用特定的 Spark 連接器 JAR,才能上傳到您的 Amazon S3 儲存貯體。如需將影片上傳至 S3 來源儲存貯體的詳細資訊,請參閱檢索功能儲存 Spark 的 JAR

上傳 JAR 之後,您必須使用以下命令提供 JAR 的 GIS 工作階段。

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

若要在 AWS Glue 執行時間安裝 Feature Store Spark,請使用 GIS notebook. AWS Glue runs 中的%additional_python_modules魔術命令pip,以執行您在 下指定的模組%additional_python_modules

%additional_python_modules sagemaker-feature-store-pyspark-3.1

開始 AWS Glue 工作階段之前,您必須使用上述兩個魔術命令。

在 AWS Glue 任務上安裝

重要

您必須使用 2 AWS Glue .0 版或更新版本。

若要在 AWS Glue 任務上安裝 Spark 連接器,請使用 --extra-jars引數來提供必要的 JARs--additional-python-modules,並在建立 AWS Glue 任務時將 Spark 連接器安裝為任務參數,如下列範例所示。如需將影片上傳至 S3 來源儲存貯體的詳細資訊,請參閱檢索功能儲存 Spark 的 JAR

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Amazon SageMaker Processing 作業上的安裝

若要將功能儲存 Spark 與 Amazon SageMaker 處理任務搭配使用,請攜帶您自己的影像。有關使用映像的更多資訊,請參閱攜帶您自己的 SageMaker AI 映像。將安裝步驟新增到 Docker 文件中。將泊塢視窗映像推送至 Amazon ECR 儲存庫之後,您可以使用 PySpark 處理器來建立處理任務。如需使用 PySpark 處理器建立處理任務的詳細資訊,請參閱使用 Apache Spark 執行處理任務

以下是將安裝步驟新增至 Dockerfile 的範例。

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

檢索功能儲存 Spark 的 JAR

要檢索特徵商店 Spark 相依性 JAR,您必須使用pip在任何 Python 環境與網路存取的 Python 套件索引 (PyPI) 儲存庫安裝 Spark 連接器。SageMaker Jupyter 筆記本是具有網路存取權的 Python 環境範例。

下列指令會安裝 Spark 連接器。

!pip install sagemaker-feature-store-pyspark-3.1

安裝特徵商店 Spark 之後,您可以擷取 JAR 位置,並將 JAR 上傳到 Amazon S3。

feature-store-pyspark-dependency-jars命令提供了特徵商店 Spark 新增的必要相依性 JAR 的位置。您可以使用命令擷取 JAR,然後將它上傳至 Amazon S3。

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

實作範例

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

使用 Python 指令碼範例提交 Spark 工作

PySpark 版本需要一個額外的依賴 JAR 進行匯入,因此需要額外的步驟來執行 Spark 應用程式。

如果您沒有在安裝過程中指定SPARK_HOME,則在執行 spark-submit 時必須在 JVM 中加載所需的 JAR。feature-store-pyspark-dependency-jars 是由 Spark 程式庫安裝的 Python 指令碼,以自動為您擷取所有 JAR 的路徑。

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

如果您在 Amazon EMR 上執行此應用程式,建議您以用戶端模式執行應用程式,這樣您就不需要將相依 JAR 散佈到其他任務節點。在Amazon EMR 群集中新增一個步驟,使用類似於以下的 Spark 引數:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

功能庫批處理. 斯卡拉

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

提交一個 Spark 工作

Scala

您應該能夠使用功能儲存 Spark 作為正常依賴關係。在所有平台上執行應用程式不需要額外的指令。