SDK del procesador de características del almacén de características - Amazon SageMaker AI

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

SDK del procesador de características del almacén de características

Determine una definición del procesador de características del almacén de características mediante la decoración de sus características de transformación con el decorador @feature_processor. El SDK de SageMaker IA para Python (Boto3) carga automáticamente los datos de las fuentes de datos de entrada configuradas, aplica la función de transformación decorada y, a continuación, ingiere los datos transformados en un grupo de características objetivo. Las funciones de transformación decoradas deben ajustarse a la firma esperada del decorador @feature_processor. Para obtener más información sobre el @feature_processor decorador, consulta @feature_processor Decorator en Amazon SageMaker Feature Store Lee los documentos.

Con el @feature_processor decorador, la función de transformación se ejecuta en un entorno de ejecución de Spark en el que los argumentos de entrada proporcionados a la función y su valor devuelto son Spark. DataFrames El número de parámetros de entrada de la función de transformación debe coincidir con el número de entradas configuradas en el decorador @feature_processor.

Para obtener más información sobre el decorador @feature_processor, consulte Feature Processor Feature Store SDK for Python (Boto3).

El siguiente código contiene ejemplos básicos de uso del decorador @feature_processor. Para ver ejemplos de casos de uso de ejemplo más específicos, consulte Ejemplo de código de procesamiento de características para casos de uso habituales.

El SDK del procesador de funciones se puede instalar desde el SDK de SageMaker Python y sus extras mediante el siguiente comando.

pip install sagemaker[feature-processor]

En los siguientes ejemplos, us-east-1 es la región del recurso, 111122223333 es el ID de la cuenta propietaria del recurso y your-feature-group-name es el nombre del grupo de características.

La siguiente es una definición básica del procesador de características, en la que el decorador @feature_processor configura una entrada CSV de Amazon S3 para cargarla y proporcionarla a la función de transformación (por ejemplo, transform), y la prepara para su incorporación a un grupo de características. Lo ejecuta la última línea.

from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/') OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' @feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG) def transform(csv_input_df): return csv_input_df transform()

El parámetro @feature_processor incluye:

  • inputs (List[str]): una lista de orígenes de datos que se utilizan en el procesador de características del almacén de características. Si los orígenes de datos son grupos de características o están almacenados en Amazon S3, es posible que pueda utilizar la definiciones de orígenes de datos proporcionadas por el almacén de características para el procesador de características. Para obtener una lista completa de las definiciones de fuentes de datos proporcionadas por Feature Store, consulte la fuente de datos del procesador de funciones en Amazon SageMaker Feature Store Lea los documentos.

  • output (str): el ARN del grupo de características para ingerir la salida de la función decorada.

  • target_stores (Optional[List[str]]): una lista de almacenes (por ejemplo, OnlineStore u OfflineStore) para ingerir en la salida. Si no se especifica, los datos se ingieren en todos los almacenes habilitados del grupo de características de salida.

  • parameters (Dict[str, Any]): un diccionario que se proporcionará a la función de transformación.

  • enable_ingestion (bool): un indicador que señala si las salidas de la función de transformación se ingieren en el grupo de características de salida. Este indicador es útil durante la fase de desarrollo. Si no se especifica, la ingestión está habilitada.

Los parámetros opcionales de la función encapsulada (que se proporcionan como argumento si se proporcionan en la firma de la función) incluyen:

  • params (Dict[str, Any]): el diccionario definido en los parámetros del @feature_processor. También contiene parámetros configurados por el sistema a los que se puede hacer referencia con la clave system, como el parámetro scheduled_time.

  • spark(SparkSession): una referencia a la SparkSession instancia inicializada para la aplicación Spark.

El siguiente código es un ejemplo del uso de los parámetros params y spark.

from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/') OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' @feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG) def transform(csv_input_df, params, spark): scheduled_time = params['system']['scheduled_time'] csv_input_df.createOrReplaceTempView('csv_input_df') return spark.sql(f''' SELECT * FROM csv_input_df WHERE date_add(event_time, 1) >= {scheduled_time} ''') transform()

El parámetro del sistema scheduled_time (que se proporciona en el argumento params a su función) es un valor importante para respaldar el reintento de cada ejecución. El valor puede ayudar a identificar de forma única la ejecución del procesador de características y puede usarse como punto de referencia para las entradas basadas en el intervalo de fechas (por ejemplo, si solo se cargan los datos de las últimas 24 horas) para garantizar que el intervalo de entrada sea independiente del tiempo de ejecución real del código. Si el procesador de características se ejecuta según una programación (consulte Ejecuciones programadas y basadas en eventos para las canalizaciones del procesador de características), su valor se fija en la hora en la que está programada su ejecución. El argumento se puede anular durante la ejecución sincrónica mediante la API de ejecución del SDK para admitir casos de uso como la reposición de datos o la repetición de una ejecución anterior omitida. Su valor es la hora actual si el procesador de características se ejecuta de otra manera.

Para obtener información sobre la creación de código de Spark, consulte Spark SQL Programming Guide.

Para obtener más ejemplos de código para casos de uso habituales, consulte Ejemplo de código de procesamiento de características para casos de uso habituales.

Tenga en cuenta que las funciones de transformación decoradas con @feature_processor no devuelven ningún valor. Para probar la función mediante programación, puede quitar o aplicar un parche al decorador @feature_processor para que actúe como paso a la función encapsulada. Para obtener más información sobre el @feature_processor decorador, consulta el SDK de Python de Amazon SageMaker Feature Store.