As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Exemplo de código de Processamento de atributos para casos de uso comuns
Os exemplos a seguir fornecem amostras de código de Processamento de atributos para casos de uso comuns. Para um exemplo mais detalhado de caderno mostrando casos de uso específicos, consulte o caderno de processamento de SageMaker recursos da Amazon Feature Store
Nos exemplos a seguir,
é a região do recurso, us-east-1
é o ID da conta do proprietário do recurso e 111122223333
é o nome do grupo de atributos.your-feature-group-name
O conjunto de dados transactions
usado nos exemplos a seguir tem o seguinte esquema:
'FeatureDefinitions': [ {'FeatureName': 'txn_id', 'FeatureType': 'String'}, {'FeatureName': 'txn_time', 'FeatureType': 'String'}, {'FeatureName': 'credit_card_num', 'FeatureType': 'String'}, {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'} ]
Tópicos
Junção de dados de várias fontes de dados
@feature_processor( inputs=[ CSVDataSource('s3://bucket/customer'), FeatureGroupDataSource('transactions') ], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/your-feature-group-name
' ) def join(transactions_df, customer_df): '''Combine two data sources with an inner join on a common column''' return transactions_df.join( customer_df, transactions_df.customer_id == customer_df.customer_id, "inner" )
Agregados de janelas deslizantes
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/your-feature-group-name
' ) def sliding_window_aggregates(transactions_df): '''Aggregates over 1-week windows, across 1-day sliding windows.''' from pyspark.sql.functions import window, avg, count return ( transactions_df .groupBy("credit_card_num", window("txn_time", "1 week", "1 day")) .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) .orderBy("window.start") .select("credit_card_num", "window.start", "avg_week", "count_week") )
Agregados de janelas em cascata
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/your-feature-group-name
' ) def tumbling_window_aggregates(transactions_df, spark): '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count FROM transactions GROUP BY credit_card_num, window(txn_time, "1 week") ORDER BY window.start ''')
Promoção do armazenamento offline para o armazenamento on-line
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/transactions' ) def offline_to_online(): '''Move data from the offline store to the online store of the same feature group.''' transactions_df.createOrReplaceTempView('transactions') return spark.sql(f''' SELECT txn_id, txn_time, credit_card_num, amount FROM (SELECT *, row_number() OVER (PARTITION BY txn_id ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC) AS row_number FROM transactions) WHERE row_number = 1 ''')
Transformações com a biblioteca Pandas
Transformações com a biblioteca Pandas
@feature_processor( inputs=[FeatureGroupDataSource('transactions')], target_stores=['OnlineStore'], output='arn:aws:sagemaker:
us-east-1
:111122223333
:feature-group/transactions' ) def pandas(transactions_df): '''Author transformations using the Pandas interface. Requires PyArrow to be installed via pip. For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark ''' import pyspark.pandas as ps # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface). pandas_on_spark_df = transactions_df.pandas_api() # Pandas-On-Spark DF to Pandas DF (Single Machine Only). pandas_df = pandas_on_spark_df.to_pandas() # Reverse: Pandas DF to Pandas-On-Spark DF pandas_on_spark_df = ps.from_pandas(pandas_df) # Reverse: Pandas-On-Spark DF to PySpark DF spark_df = pandas_on_spark_df.to_spark() return spark_df
Execuções contínuas e novas tentativas automáticas usando gatilhos baseados em eventos
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "
target-pipeline
" to_pipeline( pipeline_name=streaming_pipeline_name, step=transform ) put_trigger( source_pipeline_events=[ FeatureProcessorPipelineEvent( pipeline_name=streaming_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ) ], target_pipeline=streaming_pipeline_name )