使用亚马逊 Feature Store 批量摄取 Sp SageMaker ark - Amazon SageMaker

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用亚马逊 Feature Store 批量摄取 Sp SageMaker ark

Amazon F SageMaker eature Store Spark 是一个 Spark 连接器,它将 Spark 库连接到功能商店。Feature Store Spark 简化了从 Spark DataFrame 到特征组的数据摄取流程。Feature Store 支持在亚马逊上使用现有ETL管道、 AWS Glue 作业EMRGIS、亚马逊 SageMaker 处理任务或笔记本使用 Spark 进行批量数据摄取。 SageMaker

我们为 Python 和 Scala 开发人员提供了安装和实施批量数据摄取的方法。Python 开发者可以按照亚马逊 SageMaker 功能商店 Spark 存储库中的说明使用开源 sagemaker-feature-store-pyspark Python 库进行本地开发EMR、在亚马逊上安装以及 Jupyter Notebook GitHub。Scala 开发人员可以使用亚马逊功能商店 Spark SDK Maven 中央存储库中提供的 SageMaker 功能商店 Spark 连接器。

可以通过以下方式使用 Spark 连接器摄取数据,具体取决于是启用了在线存储、离线存储,还是两者均已启用。

  1. 默认采集 — 如果在线商店已启用,Spark connector 会首先使用将你的数据框提取到在线商店中。PutRecordAPI在线存储仅保留事件时间最长的记录。如果启用了离线存储,Feature Store 将在 15 分钟内将您的数据框提取到离线存储中。有关在线和离线存储工作原理的更多信息,请参阅 Feature Store 概念

    您可以通过不在 .ingest_data(...) 方法中指定 target_stores 来完成此操作。

  2. 离线存储直接摄取 - 如果启用了离线存储,Spark 连接器会将您的数据框直接批量摄取到离线存储中。将数据框直接摄取到离线存储并不会更新在线存储。

    您可以通过在 .ingest_data(...) 方法中设置 target_stores=["OfflineStore"] 来完成此操作。

  3. 仅限在线商店 — 如果启用了在线商店,Spark connector 会使用将您的数据框提取到在线商店中。PutRecordAPI将数据框直接摄取到在线存储并不会更新离线存储。

    您可以通过在 .ingest_data(...) 方法中设置 target_stores=["OnlineStore"] 来完成此操作。

有关使用不同摄取方法的信息,请参阅示例实施

Feature Store Spark 安装

Scala 用户

Feature Store Spark SDK 可在亚马逊 F SageMaker eature Store Spark SDK Maven 中央存储库中获得,供 Scala 用户使用。

要求

  • Spark >= 3.0.0 且 <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • 亚马逊 EMR >= 6.1.0(仅当你使用亚马逊时)EMR

在 POM .xml 中声明依赖关系

Feature Store Spark 连接器在 iceberg-spark-runtime 库中有一个依赖项。因此,如果要将数据摄取到使用 Iceberg 表格式自动创建的特征组中,则必须将相应版本的 iceberg-spark-runtime 库添加到该依赖项中。例如,如果使用的是 Spark 3.1,则必须在项目的 POM.xml 中声明以下内容:

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Python 用户

Feature Stor SDK e Spark 可在开源亚马逊 SageMaker 功能商店 Spark GitHub 存储库中找到。

要求

  • Spark >= 3.0.0 且 <= 3.3.0

  • 亚马逊 EMR >= 6.1.0(仅当你使用亚马逊时)EMR

  • 内核 = conda_python3

我们建议将 $SPARK_HOME 设置为安装了 Spark 的目录。在安装过程中,Feature Store 会JAR将所需的内容上传到SPARK_HOME,这样依赖关系就会自动加载。需要启动 Spark 才能使该 PySpark 库正常工作。JVM

本地安装

要查找有关安装的更多信息,请通过将 --verbose 附加到以下安装命令来启用详细模式。

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

在亚马逊上安装 EMR

使用 6.1.0 或更高版本创建一个 Amazon EMR 集群。启用SSH可帮助您解决任何问题。

可以执行以下操作之一来安装库:

  • 在 Amazon 中创建自定义步骤EMR。

  • 使用连接到您的集群SSH,然后从那里安装库。

注意

以下信息使用 Spark 版本 3.1,但您可以指定满足要求的任何版本。

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
注意

如果要将依赖项JARs自动安装到 SPARK _HOME,请不要使用 bootstrap 步骤。

在 SageMaker 笔记本实例上安装

使用以下命令安装 PySpark 与 Spark 连接器兼容的版本:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

如果您要对离线存储执行批量摄取,则依赖项不在笔记本实例环境中。

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

在笔记本电脑上安装 GIS

重要

必须使用 2.0 或更高 AWS Glue 版本。

使用以下信息帮助您在 AWS Glue 交互式会话中安装 PySpark 连接器(GIS)。

Amaz SageMaker on Feature Store Spark 需要在会话初始化JAR期间使用特定的 Spark 连接器上传到您的 Amazon S3 存储桶。有关将所需JAR内容上传到 S3 存储桶的更多信息,请参阅正在检索 Feature Store Spark JAR 的

上传后JAR,必须JAR使用以下命令为GIS会话提供信息。

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

要在 AWS Glue 运行时安装 Feature Store Spark,请在GIS笔记本中使用%additional_python_modules神奇的命令。 AWS Glue 运行pip到您在下指定的模块%additional_python_modules

%additional_python_modules sagemaker-feature-store-pyspark-3.1

在开始会 AWS Glue 话之前,必须使用前面的两个神奇命令。

在 AWS Glue 工作中安装

重要

必须使用 2.0 或更高 AWS Glue 版本。

要在 AWS Glue 作业上安装 Spark 连接器,请使用--extra-jars参数提供必要的信息,JARs并在--additional-python-modules创建作业时将 Spark Connector 安装为 AWS Glue 作业参数,如下例所示。有关将所需JAR内容上传到 S3 存储桶的更多信息,请参阅正在检索 Feature Store Spark JAR 的

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

在 Amazon SageMaker 处理任务上安装

要在亚马逊 SageMaker 处理任务中使用 Feature Store Spark,请自带图片。有关自带映像的更多信息,请参阅带上你自己的 SageMaker 图片。向 Dockerfile 添加安装步骤。将 Docker 映像推送到 Amazon ECR 存储库后,您可以使用 PySparkProcessor 来创建处理任务。有关使用处理 PySpark 器创建处理任务的更多信息,请参阅使用 Apache Spark 运行处理作业

以下是向 Dockerfile 添加安装步骤的示例。

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

正在检索 Feature Store Spark JAR 的

要检索 Feature Store Spark 依赖关系JAR,您必须在任何具有网络访问权限的 Python 环境中使用从 Python 包索引 (PyPI) 存储库pip中安装 Spark 连接器。 SageMaker Jupyter 笔记本就是一个具有网络访问权限的 Python 环境的示例。

以下命令用于安装 Spark 连接器。

!pip install sagemaker-feature-store-pyspark-3.1

安装 Feature Store Spark 后,您可以检索JAR位置并将其上传JAR到 Amazon S3。

feature-store-pyspark-dependency-jars命令提供了 Feature Store Spark 添加JAR的必要依赖项的位置。您可以使用命令检索JAR并将其上传到 Amazon S3。

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

示例实施

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

使用示例 Python 脚本提交 Spark 作业

该 PySpark 版本需要导入额外的依赖JAR项,因此需要额外的步骤来运行 Spark 应用程序。

如果您SPARK_HOME在安装期间未指定,则必须在运行JVM时按要求JARs进行加载spark-submitfeature-store-pyspark-dependency-jars是 Spark 库安装的 Python 脚本,JARs用于自动为你获取所有路径。

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

如果您在 Amazon 上运行此应用程序EMR,我们建议您在客户端模式下运行该应用程序,这样您就无需将依赖项分配JARs给其他任务节点。使用 Spark 参数在 Amazon EMR 集群中再添加一个步骤,如下所示:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

提交 Spark 作业

Scala

您应该能够将 Feature Store Spark 作为一个正常的依赖项来使用。无需额外的指令即可在所有平台上运行该应用程序。