本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
常见使用案例的特征处理代码示例
以下各示例为常见使用案例提供了特征处理代码示例。有关展示特定用例的更详细的示例笔记本,请参阅 Amazon Feature Store SageMaker 功能处理笔记本
在以下示例中,
是资源所在的区域,us-east-1
是资源拥有者账户 ID,111122223333
是特征组名称。your-feature-group-name
下面各示例中使用的 transactions
数据集具有以下架构:
'FeatureDefinitions': [ {'FeatureName': 'txn_id', 'FeatureType': 'String'}, {'FeatureName': 'txn_time', 'FeatureType': 'String'}, {'FeatureName': 'credit_card_num', 'FeatureType': 'String'}, {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'} ]
联接多个数据源中的数据
@feature_processor( inputs=[ CSVDataSource('s3://bucket/customer'), FeatureGroupDataSource('transactions') ], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/your-feature-group-name
' ) def join(transactions_df, customer_df): '''Combine two data sources with an inner join on a common column''' return transactions_df.join( customer_df, transactions_df.customer_id == customer_df.customer_id, "inner" )
滑动窗口聚合
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/your-feature-group-name
' ) def sliding_window_aggregates(transactions_df): '''Aggregates over 1-week windows, across 1-day sliding windows.''' from pyspark.sql.functions import window, avg, count return ( transactions_df .groupBy("credit_card_num", window("txn_time", "1 week", "1 day")) .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) .orderBy("window.start") .select("credit_card_num", "window.start", "avg_week", "count_week") )
滚动窗口聚合
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/your-feature-group-name
' ) def tumbling_window_aggregates(transactions_df, spark): '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count FROM transactions GROUP BY credit_card_num, window(txn_time, "1 week") ORDER BY window.start ''')
从离线存储提升到在线存储
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/transactions' ) def offline_to_online(): '''Move data from the offline store to the online store of the same feature group.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT txn_id, txn_time, credit_card_num, amount FROM (SELECT *, row_number() OVER (PARTITION BY txn_id ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC) AS row_number FROM transactions) WHERE row_number = 1 ''')
使用 Pandas 库进行转换
使用 Pandas 库进行转换
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/transactions' ) def pandas(transactions_df): '''Author transformations using the Pandas interface. Requires PyArrow to be installed via pip. For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark ''' import pyspark.pandas as ps # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface). pandas_on_spark_df = transactions_df.pandas_api() # Pandas-On-Spark DF to Pandas DF (Single Machine Only). pandas_df = pandas_on_spark_df.to_pandas() # Reverse: Pandas DF to Pandas-On-Spark DF pandas_on_spark_df = ps.from_pandas(pandas_df) # Reverse: Pandas-On-Spark DF to PySpark DF spark_df = pandas_on_spark_df.to_spark() return spark_df
使用基于事件的触发器进行连续执行和自动重试
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "
target-pipeline
" to_pipeline( pipeline_name=streaming_pipeline_name, step=transform ) put_trigger( source_pipeline_events=[ FeatureProcessorPipelineEvent( pipeline_name=streaming_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ) ], target_pipeline=streaming_pipeline_name )