Ejemplo de código de procesamiento de características para casos de uso habituales - Amazon SageMaker AI

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplo de código de procesamiento de características para casos de uso habituales

Los siguientes ejemplos incluyen muestras de código de procesamiento de características para casos de uso habituales. Para ver un ejemplo de cuaderno más detallado que muestre casos de uso específicos, consulta el cuaderno de procesamiento de características de Amazon SageMaker Feature Store.

En los siguientes ejemplos, us-east-1 es la región del recurso, 111122223333 es el ID de la cuenta propietaria del recurso y your-feature-group-name es el nombre del grupo de características.

El conjunto de datos transactions utilizado en los siguientes ejemplos tiene el siguiente esquema:

'FeatureDefinitions': [ {'FeatureName': 'txn_id', 'FeatureType': 'String'}, {'FeatureName': 'txn_time', 'FeatureType': 'String'}, {'FeatureName': 'credit_card_num', 'FeatureType': 'String'}, {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'} ]

Unión de datos desde varios orígenes de datos

@feature_processor( inputs=[ CSVDataSource('s3://bucket/customer'), FeatureGroupDataSource('transactions') ], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' ) def join(transactions_df, customer_df): '''Combine two data sources with an inner join on a common column''' return transactions_df.join( customer_df, transactions_df.customer_id == customer_df.customer_id, "inner" )

Agregación en ventanas deslizantes

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' ) def sliding_window_aggregates(transactions_df): '''Aggregates over 1-week windows, across 1-day sliding windows.''' from pyspark.sql.functions import window, avg, count return ( transactions_df .groupBy("credit_card_num", window("txn_time", "1 week", "1 day")) .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) .orderBy("window.start") .select("credit_card_num", "window.start", "avg_week", "count_week") )

Agregación en ventanas de saltos de tamaño constante

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' ) def tumbling_window_aggregates(transactions_df, spark): '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count FROM transactions GROUP BY credit_card_num, window(txn_time, "1 week") ORDER BY window.start ''')

Promoción del almacenamiento sin conexión al almacenamiento en línea

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions' ) def offline_to_online(): '''Move data from the offline store to the online store of the same feature group.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT txn_id, txn_time, credit_card_num, amount FROM (SELECT *, row_number() OVER (PARTITION BY txn_id ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC) AS row_number FROM transactions) WHERE row_number = 1 ''')

Transformaciones con la biblioteca Pandas

Transformaciones con la biblioteca Pandas

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions' ) def pandas(transactions_df): '''Author transformations using the Pandas interface. Requires PyArrow to be installed via pip. For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark ''' import pyspark.pandas as ps # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface). pandas_on_spark_df = transactions_df.pandas_api() # Pandas-On-Spark DF to Pandas DF (Single Machine Only). pandas_df = pandas_on_spark_df.to_pandas() # Reverse: Pandas DF to Pandas-On-Spark DF pandas_on_spark_df = ps.from_pandas(pandas_df) # Reverse: Pandas-On-Spark DF to PySpark DF spark_df = pandas_on_spark_df.to_spark() return spark_df

Ejecuciones continuas y reintentos automáticos mediante desencadenadores basados en eventos

from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "target-pipeline" to_pipeline( pipeline_name=streaming_pipeline_name, step=transform ) put_trigger( source_pipeline_events=[ FeatureProcessorPipelineEvent( pipeline_name=streaming_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ) ], target_pipeline=streaming_pipeline_name )