使用 Amazon SageMaker Feature Store Spark 的批次擷取 - Amazon SageMaker

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon SageMaker Feature Store Spark 的批次擷取

Amazon SageMaker Feature Store Spark 是 Spark 連接器,可將 Spark 程式庫連接至 Feature Store。特徵商店 Spark 簡化了從 Spark DataFrame 到特徵群組的資料擷取作業。Feature Store 使用您現有的ETL管道,在 Amazon EMR、、GIS AWS Glue 任務、Amazon SageMaker 處理任務或 SageMaker 筆記本上支援使用 Spark 的批次資料擷取。

為 Python 和 Scala 開發人員提供了用於安裝和實施批次資料擷取的方法。Python 開發人員可以依照 Amazon Feature Store Spark 儲存庫 中的說明,使用開放原始碼 sagemaker-feature-store-pyspark Python 程式庫進行本機開發EMR、在 Amazon 上安裝,以及針對 Jupyter Notebooks 進行安裝。 SageMaker GitHub Scala 開發人員可以使用 Amazon Feature Store Spark SDK Maven 中央儲存庫 中提供的 SageMaker Feature Store Spark 連接器。

您可以使用 Spark 連接器以下列方式擷取資料,視線上儲存、離線儲存或兩者是否啟用而定。

  1. 根據預設擷取 – 如果已啟用線上儲存,Spark 連接器會先使用 將資料架構擷取至線上儲存PutRecordAPI。只有活動時間最長的記錄保留在線儲存中。如果已啟用離線存放區,則功能儲存會在 15 分鐘內將您的資料框擷取至離線存放區。如需線上和離線儲存運作方式的詳細資訊,請參閱功能儲存概念

    您可以通過不在.ingest_data(...)方法中指定target_stores來完成此操作。

  2. 離線儲存區直接擷取 — 如果啟用離線存放區,Spark 連接器批次會將您的資料框直接擷取至離線存放區。將資料框直接導入離線儲存並不會更新線上儲存。

    您可以通過在.ingest_data(...)方法中設置 target_stores=["OfflineStore"] 來完成此操作。

  3. 僅限線上商店 – 如果已啟用線上商店,Spark 連接器會使用 將資料架構擷取到線上商店PutRecordAPI。將資料框直接導入線上儲存並不會更新離線儲存。

    您可以通過在.ingest_data(...)方法中設置 target_stores=["OnlineStore"] 來完成此操作。

如需不同擷取方法的詳細資訊,請參閱實作範例

功能儲存 Spark 安裝

Scala 使用者

適用於 Scala 使用者的 Amazon SageMaker Feature Store Spark SDK Maven 中央儲存庫SDK中提供 Feature Store Spark。

需求

  • Spark >= 3.0.0 和 <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (僅當您使用 Amazon 時EMR)

宣告 POM.xml 中的相依性

特徵商店 Spark 連接器具有 iceberg-spark-runtime 程式庫的相依性。因此,如果您要將資料擷取到已使用 Iceberg 資料表格式自動建立的功能群組中,則必須將相應版本的 iceberg-spark-runtime程式庫新增至相依性。例如,如果您使用的是 Spark 3.1,則必須在您的項目中聲明以下內容POM.xml

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Python 使用者

Feature Store Spark SDK可在開放原始碼 Amazon SageMaker Feature Store Spark GitHub 儲存庫 中使用。

需求

  • Spark >= 3.0.0 和 <= 3.3.0

  • Amazon EMR >= 6.1.0 (僅當您使用 Amazon 時EMR)

  • 核心 = conda_python3

我們建議將 $SPARK_HOME 設定為 Spark 安裝目錄。在安裝期間, Feature Store 會將所需的 上傳至 JAR SPARK_HOME,以便相依性自動載入。啟動 JVM需要 Spark 才能讓此 PySpark 程式庫運作。

本機安裝

若要尋找有關安裝的詳細資訊,請在以下安裝中新增 --verbose 以啟用詳細模式。

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

在 Amazon 上安裝 EMR

使用 6.1.0 版或更新版本建立 Amazon EMR叢集。啟用 SSH以協助您疑難排解任何問題。

您可以執行下列操作之一來安裝該資料庫:

  • 在 Amazon 中建立自訂步驟EMR。

  • 使用 連線至叢集,SSH並從該處安裝程式庫。

注意

下列資訊使用 Spark 3.1 版,但您可以指定符合需求的任何版本。

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
注意

如果您想要將相依項目JARs自動安裝至 SPARK_HOME,請勿使用引導步驟。

在 SageMaker 筆記本執行個體上安裝

使用以下命令安裝與 Spark 連接器 PySpark 相容的 版本:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

如果您要對離線存放區執行批次擷取,則相依性不在筆記本執行個體環境中。

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

在具有 的筆記本上安裝 GIS

重要

您必須使用 2 AWS Glue .0 版或更新版本。

使用下列資訊協助您在互動式工作階段 () AWS Glue 中安裝 PySpark 連接器GIS。

Amazon SageMaker Feature Store Spark 在初始化工作階段JAR期間需要特定的 Spark 連接器,才能上傳至您的 Amazon S3 儲存貯體。如需將所需 上傳至 JAR S3 儲存貯體的詳細資訊,請參閱 擷取 Feature Store Spark JAR的

上傳 之後JAR,您必須JAR使用下列命令為GIS工作階段提供 。

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

若要在 AWS Glue 執行階段安裝 Feature Store Spark,請使用 GIS notebook. AWS Glue runs 中的%additional_python_modules魔術命令pip,以執行您在 下指定的模組%additional_python_modules

%additional_python_modules sagemaker-feature-store-pyspark-3.1

開始 AWS Glue 工作階段之前,您必須使用上述兩個魔術命令。

在 AWS Glue 任務上安裝

重要

您必須使用 2 AWS Glue .0 版或更新版本。

若要在 AWS Glue 任務上安裝 Spark 連接器,請使用 --extra-jars 引數提供必要的 --additional-python-modules JARs,並在建立 AWS Glue 任務時將 Spark 連接器安裝為任務參數,如下列範例所示。如需將所需 上傳至 JAR S3 儲存貯體的詳細資訊,請參閱 擷取 Feature Store Spark JAR的

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

在 Amazon SageMaker Processing 任務上安裝

若要將 Feature Store Spark 與 Amazon SageMaker 處理任務搭配使用,請攜帶您自己的映像。有關使用映像的更多資訊,請參閱攜帶您自己的 SageMaker 映像。將安裝步驟新增到 Docker 文件中。將 Docker 映像推送至 Amazon ECR儲存庫之後,您可以使用 PySparkProcessor 建立處理任務。如需使用 PySpark 處理器建立處理任務的詳細資訊,請參閱 使用 Apache Spark 執行處理任務

以下是將安裝步驟新增至 Dockerfile 的範例。

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

擷取 Feature Store Spark JAR的

若要擷取 Feature Store Spark 相依性 JAR,您必須在具有網路存取權的任何 Python 環境中pip,使用 從 Python 套件索引 PyPI) 儲存庫安裝 Spark 連接器。 SageMaker Jupyter Notebook 是具有網路存取的 Python 環境範例。

下列指令會安裝 Spark 連接器。

!pip install sagemaker-feature-store-pyspark-3.1

安裝 Feature Store Spark 後,您可以擷取JAR位置並將 JAR 上傳至 Amazon S3。

feature-store-pyspark-dependency-jars命令會提供 JAR Feature Store Spark 新增的必要相依性位置。您可以使用 命令來擷取 JAR 並將其上傳至 Amazon S3。

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

實作範例

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

使用 Python 指令碼範例提交 Spark 工作

PySpark 版本需要JAR匯入額外的相依性,因此執行 Spark 應用程式需要額外的步驟。

如果您在安裝SPARK_HOME期間未指定 ,則您必須在執行 JVM時載入必要的 JARs spark-submitfeature-store-pyspark-dependency-jars是 Spark 程式庫安裝的 Python 指令碼,以自動JARs為您擷取所有 的路徑。

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

如果您是在 Amazon 上執行此應用程式EMR,建議您以用戶端模式執行應用程式,這樣您就不需要將相依項目分發JARs給其他任務節點。使用類似下列的 Spark 引數在 Amazon EMR叢集中新增一個步驟:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

提交一個 Spark 工作

Scala

您應該能夠使用功能儲存 Spark 作為正常依賴關係。在所有平台上執行應用程式不需要額外的指令。