Esempio di codice di elaborazione della funzionalità per casi d'uso comuni - Amazon SageMaker

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempio di codice di elaborazione della funzionalità per casi d'uso comuni

Nei seguenti esempi vengono riportati esempi di codice di elaborazione delle funzionalità per casi d'uso comuni. Per un notebook di esempio più dettagliato che mostra casi d'uso specifici, consulta il notebook Amazon SageMaker Feature Store Feature Processing.

Negli esempi seguenti, us-east-1 è la Regione della risorsa, 111122223333 è l'ID dell'account proprietario della risorsa e your-feature-group-name è il nome del gruppo di funzionalità.

Il set di dati transactions utilizzato negli esempi seguenti presenta lo schema seguente:

'FeatureDefinitions': [ {'FeatureName': 'txn_id', 'FeatureType': 'String'}, {'FeatureName': 'txn_time', 'FeatureType': 'String'}, {'FeatureName': 'credit_card_num', 'FeatureType': 'String'}, {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'} ]

Unione di dati da più origini dati

@feature_processor( inputs=[ CSVDataSource('s3://bucket/customer'), FeatureGroupDataSource('transactions') ], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' ) def join(transactions_df, customer_df): '''Combine two data sources with an inner join on a common column''' return transactions_df.join( customer_df, transactions_df.customer_id == customer_df.customer_id, "inner" )

Aggregati basati su finestre temporali scorrevoli

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' ) def sliding_window_aggregates(transactions_df): '''Aggregates over 1-week windows, across 1-day sliding windows.''' from pyspark.sql.functions import window, avg, count return ( transactions_df .groupBy("credit_card_num", window("txn_time", "1 week", "1 day")) .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) .orderBy("window.start") .select("credit_card_num", "window.start", "avg_week", "count_week") )

Aggregati basati su finestre temporali a cascata

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' ) def tumbling_window_aggregates(transactions_df, spark): '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count FROM transactions GROUP BY credit_card_num, window(txn_time, "1 week") ORDER BY window.start ''')

Promozione dall'archivio offline all'archivio online

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions' ) def offline_to_online(): '''Move data from the offline store to the online store of the same feature group.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT txn_id, txn_time, credit_card_num, amount FROM (SELECT *, row_number() OVER (PARTITION BY txn_id ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC) AS row_number FROM transactions) WHERE row_number = 1 ''')

Trasformazioni con la libreria Pandas

Trasformazioni con la libreria Pandas

@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions' ) def pandas(transactions_df): '''Author transformations using the Pandas interface. Requires PyArrow to be installed via pip. For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark ''' import pyspark.pandas as ps # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface). pandas_on_spark_df = transactions_df.pandas_api() # Pandas-On-Spark DF to Pandas DF (Single Machine Only). pandas_df = pandas_on_spark_df.to_pandas() # Reverse: Pandas DF to Pandas-On-Spark DF pandas_on_spark_df = ps.from_pandas(pandas_df) # Reverse: Pandas-On-Spark DF to PySpark DF spark_df = pandas_on_spark_df.to_spark() return spark_df

Esecuzioni continue e tentativi automatici utilizzando trigger basati su eventi

from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "target-pipeline" to_pipeline( pipeline_name=streaming_pipeline_name, step=transform ) put_trigger( source_pipeline_events=[ FeatureProcessorPipelineEvent( pipeline_name=streaming_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ) ], target_pipeline=streaming_pipeline_name )