本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
功能存放區功能處理器 SDK
透過使用@feature_processor
裝飾器裝飾轉換函式來宣告特徵商店特徵處理器定義。for Python SageMaker SDK(Boto3) 會自動從設定的輸入資料來源載入資料,套用裝飾轉換函數,然後將轉換的資料擷取至目標特徵群組。裝飾的轉換函式必須符合@feature_processor
裝飾器的預期簽名。如需@feature_processor
裝飾器的詳細資訊,請參閱 Amazon SageMaker Feature Store 中的 @feature_processor Decorator
使用@feature_processor
裝飾器時,轉換函數會在 Spark 執行期環境中執行,其中提供給函數的輸入引數及其傳回值為 Spark DataFrames。轉換函式中的輸入參數數目必須與@feature_processor
裝飾器中配置的輸入數目相符。
如需@feature_processor
裝飾器的詳細資訊,請參閱SDK適用於 Python 的 Feature Processor Feature Store (Boto3)
下面的代碼是有關如何使用@feature_processor
裝飾器的基本範例。如需更具體的使用案例範例,請參閱常見使用案例的特徵處理程式碼範例。
可以使用下列命令,從 SageMaker Python SDK及其額外項目SDK安裝功能處理器。
pip install sagemaker[feature-processor]
在以下範例中,
是資源的區域,us-east-1
是資源擁有者帳戶 ID,111122223333
是特徵群組名稱。your-feature-group-name
以下是基本特徵處理器定義,其中@feature_processor
裝飾器會設定要載入並提供給轉換函數 (例如 transform
) 的 Amazon S3 CSV輸入,並準備擷取至特徵群組。最後一行執行它。
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor CSV_DATA_SOURCE = CSVDataSource('s3://
your-bucket
/prefix-to-csv
/') OUTPUT_FG = 'arn:aws:sagemaker:us-east-1
:111122223333
:feature-group/your-feature-group-name
' @feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG) def transform(csv_input_df): return csv_input_df transform()
@feature_processor
參數包括:
-
inputs
(List[str]):Feature Store 特徵處理器中使用的資料來源清單。如果您的資料來源是功能群組或存放在 Amazon S3 中,您可以使用功能存放區為功能處理器提供的資料來源定義。如需 Feature Store 提供資料來源定義的完整清單,請參閱 Amazon Feature Store 讀取文件中的 SageMaker Feature Processor 資料來源。 -
output
(str):擷取裝飾函數輸出ARN的功能群組的 。 -
target_stores
(Optinal[List[str]]):要擷取至輸出的儲存清單 (例如,OnlineStore
或OfflineStore
)。如果未指定,則會將資料擷取到所有輸出功能群組的已啟用存放區。 -
parameters
(Dict[str, Any]):要提供給您的轉換函式的字典。 -
enable_ingestion
(bool):指出轉換函式的輸出是否已擷取至輸出特徵群組的旗標。此標誌在開發階段非常有用。如果未指定,則會啟用擷取。
可選的包裝函式參數 (如果在函式簽名中提供,則作為引數提供) 包括:
-
params
(Dict[str, Any]):在@feature_processor
參數中定義的字典。它還包含系統設定的參數,這些參數可以與鍵system
一起參考,例如scheduled_time
參數。 -
spark
(SparkSession):針對 Spark 應用程式初始化的 SparkSession 執行個體參考。
以下程式碼是使用 params
和 spark
參數的範例。
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor CSV_DATA_SOURCE = CSVDataSource('s3://
your-bucket
/prefix-to-csv
/') OUTPUT_FG = 'arn:aws:sagemaker:us-east-1
:111122223333
:feature-group/your-feature-group-name
' @feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG) def transform(csv_input_df, params, spark): scheduled_time = params['system']['scheduled_time'] csv_input_df.createOrReplaceTempView('csv_input_df') return spark.sql(f''' SELECT * FROM csv_input_df WHERE date_add(event_time, 1) >= {scheduled_time} ''') transform()
scheduled_time
系統參數 (在函式的params
引數中提供) 是支援重試每次執行的重要值。此值有助於唯一識別特徵處理器的執行,並可作為以日期為基礎的輸入的參考點 (例如,僅載入最近 24 小時的資料),以確保輸入範圍與程式碼的實際執行時間無關。如果特徵處理器按照排程執行 (請參閱以排程和事件為基礎執行特徵處理器管道),則其值會固定為排定執行的時間。在同步執行期間,可以使用 SDK的執行覆寫引數API,以支援資料回填或重新執行遺漏的過去執行等使用案例。如果功能處理器以任何其他方式執行,則其值是目前的時間。
如需編寫 Spark 程式碼的相關資訊,請參閱 Spark SQL程式設計指南
如需常見使用案例的更多程式碼範例,請參閱常見使用案例的特徵處理程式碼範例。
請注意,用裝飾的轉換函式@feature_processor
不會返回值。要以編程方式測試您的函式,您可以刪除或猴子修補@feature_processor
裝飾器,使其充當包裝函式的傳遞。如需@feature_processor
裝飾器的詳細資訊,請參閱 Amazon SageMaker Feature Store Python SDK