功能商店功能处理器 SDK - Amazon SageMaker

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

功能商店功能处理器 SDK

通过使用 @feature_processor 装饰器装饰您的转换函数来声明 Feature Store 特征处理器定义。f SageMaker SDK or Python (Boto3) 会自动从配置的输入数据源加载数据,应用装饰后的转换函数,然后将转换后的数据摄取到目标特征组。经过修饰的转换函数必须符合 @feature_processor 装饰器的预期签名。有关@feature_processor装饰器的更多信息,请参阅 Amazon Feature St ore 中的 @feature_processor Dec or SageMaker ator 阅读文档。

使用@feature_processor装饰器,您的转换函数在 Spark 运行时环境中运行,在该环境中,提供给您的函数的输入参数及其返回值都是 Spark DataFrames。转换函数中的输入参数数量必须与 @feature_processor 装饰器中配置的输入数量相匹配。

有关@feature_processor装饰器的更多信息,请参阅适用于 Python 的功能处理器功能库 SDK (Boto3)

以下代码是有关如何使用 @feature_processor 装饰器的基本示例。如需更具体的示例使用案例,请参阅常见使用案例的特征处理代码示例

SDK可以使用以下命令从 SageMaker Python SDK 及其附加组件安装功能处理器。

pip install sagemaker[feature-processor]

在以下示例中,us-east-1 是资源所在的区域,111122223333 是资源拥有者账户 ID,your-feature-group-name 是特征组名称。

以下是基本的功能处理器定义,其中@feature_processor装饰器将来自 Amazon S3 的CSV输入配置为加载并提供给您的转换函数(例如transform),并准备将其提取到功能组。最后一行运行该函数。

from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/') OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' @feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG) def transform(csv_input_df): return csv_input_df transform()

@feature_processor 参数包括:

  • inputs (List[str]):Feature Store 特征处理器中使用的数据源列表。如果数据源是特征组或存储在 Amazon S3 中,您可以将 Feature Store 提供的数据源定义用于特征处理器。有关功能商店提供的数据源定义的完整列表,请参阅 Amazon Feature Store 中的 SageMaker 功能处理器数据源阅读文档。

  • output(str):用于摄取装饰函数输出的功能组的。ARN

  • target_stores (Optional[List[str]]):要摄取到输出中的存储(例如 OnlineStoreOfflineStore)列表。如果未指定该参数,则数据将摄取到输出特征组的所有已启用存储。

  • parameters (Dict[str, Any]):要提供给转换函数的字典。

  • enable_ingestion (bool):一个标志,用于指示转换函数的输出是否摄取到输出特征组。此标志在开发阶段很有用。如果未指定该标志,则会启用摄取。

可选的封装函数参数(如果在函数签名中提供,则作为参数提供)包括:

  • params (Dict[str, Any]):@feature_processor 参数中定义的字典。它还包含可使用键 system 引用的系统配置参数,例如 scheduled_time 参数。

  • spark(SparkSession):对为 Spark 应用程序初始化的 SparkSession 实例的引用。

以下代码是使用 paramsspark 参数的示例。

from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/') OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' @feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG) def transform(csv_input_df, params, spark): scheduled_time = params['system']['scheduled_time'] csv_input_df.createOrReplaceTempView('csv_input_df') return spark.sql(f''' SELECT * FROM csv_input_df WHERE date_add(event_time, 1) >= {scheduled_time} ''') transform()

scheduled_time 系统参数(在函数的 params 参数中提供)是支持重试每次执行的重要值。该值可帮助唯一标识特征处理器的执行情况,并可用作基于日期范围的输入(例如,仅加载最近 24 小时的数据)的参考点,以保证输入范围独立于代码的实际执行时间。如果特征处理器按计划运行(请参阅特征处理器管道的计划执行和基于事件的执行),则其值固定为计划运行的时间。在同步执行期间,可以使用 SDK's execute来覆盖该参数,API以支持诸如数据回填或重新运行过去错过的执行之类的用例。如果特征处理器以任何其他方式运行,则其值为当前时间。

有关编写 Spark 代码的信息,请参阅《Spark SQL 编程指南》

有关常见使用案例的更多代码示例,请参阅常见使用案例的特征处理代码示例

请注意,用 @feature_processor 进行修饰的转换函数不会返回值。要以编程方式对函数进行测试,可以删除 @feature_processor 装饰器或为其打上猴子补丁,使其充当封装函数的直通函数。有关@feature_processor装饰器的更多详细信息,请参阅亚马逊 SageMaker 功能商店 Python SDK