Pre-elaborazione e post-elaborazione - Amazon SageMaker

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Pre-elaborazione e post-elaborazione

È possibile utilizzare script Python personalizzati di pre-elaborazione e post-elaborazione per trasformare l'input al monitoraggio del modello o estendere il codice dopo un'esecuzione di monitoraggio riuscita. Carica questi script su Amazon S3 e utilizzali come riferimento durante la creazione del monitoraggio di modello.

L'esempio seguente mostra come personalizzare le pianificazioni di monitoraggio con script di pre-elaborazione e post-elaborazione. Replace (Sostituisci) user placeholder text con le tue informazioni.

import boto3, os from sagemaker import get_execution_role, Session from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor # Upload pre and postprocessor scripts session = Session() bucket = boto3.Session().resource("s3").Bucket(session.default_bucket()) prefix = "demo-sagemaker-model-monitor" pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor.py")).upload_file("preprocessor.py") post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor.py")).upload_file("postprocessor.py") # Get execution role role = get_execution_role() # can be an empty string # Instance type instance_type = "instance-type" # instance_type = "ml.m5.xlarge" # Example # Create a monitoring schedule with pre and postprocessing my_default_monitor = DefaultModelMonitor( role=role, instance_count=1, instance_type=instance_type, volume_size_in_gb=20, max_runtime_in_seconds=3600, ) s3_report_path = "s3://{}/{}".format(bucket, "reports") monitor_schedule_name = "monitor-schedule-name" endpoint_name = "endpoint-name" my_default_monitor.create_monitoring_schedule( post_analytics_processor_script=post_processor_script, record_preprocessor_script=pre_processor_script, monitor_schedule_name=monitor_schedule_name, # use endpoint_input for real-time endpoint endpoint_input=endpoint_name, # or use batch_transform_input for batch transform jobs # batch_transform_input=batch_transform_name, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )

Script di pre-elaborazione

Utilizza gli script di pre-elaborazione quando devi trasformare gli input che arrivano al monitoraggio del modello.

Per esempio, supponiamo che l'output del tuo modello sia una matrice [1.0, 2.1]. Il contenitore Amazon SageMaker Model Monitor funziona solo con JSON strutture tabulari o appiattite, come. {prediction0”: 1.0, “prediction1” : 2.1} Puoi usare uno script di preelaborazione come il seguente per trasformare l'array nella struttura corretta. JSON

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }

In un altro caso, supponiamo che il modello abbia funzionalità opzionali e che si utilizzi -1 per indicare che la funzionalità opzionale ha un valore mancante. Se disponi di un sistema di monitoraggio per la qualità dei dati, è consigliabile rimuovere -1 dalla matrice dei valori di input in modo che non venga incluso nei calcoli dei parametri di monitoraggio. Per rimuovere tali valori puoi utilizzare uno script come quello riportato di seguito.

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

Lo script di pre-elaborazione riceve inference_record come unico input. Il seguente frammento di codice mostra un esempio di inference_record.

{ "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9", "inferenceTime": "2019-11-20T23:33:12Z" }, "eventVersion": "0" }

Il seguente frammento di codice mostra l'intera struttura di classe di inference_record.

KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_GROUND_TRUTH_DATA = "groundTruthData" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput" KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput" KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.batch_transform_output = None self.ground_truth = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT]) if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT]) if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]: self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT]) if KEY_GROUND_TRUTH_DATA in event_dict: self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_ENDPOINT_OUTPUT ] = self.endpoint_output.as_dict() if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict() self._event_dict_postprocessed = True return self.event_dict def __str__(self): return str(self.as_dict())

Campionamento personalizzato

Puoi anche applicare una strategia di campionamento personalizzata nello script di pre-elaborazione. A tale scopo, configura il container proprietario e predefinito di Model Monitor in modo che ignori una percentuale dei record in base alla frequenza di campionamento specificata. Nell'esempio seguente, il gestore campiona il 10% dei record restituendo il record nel 10% delle chiamate al gestore e in caso contrario un elenco vuoto.

import random def preprocess_handler(inference_record): # we set up a sampling rate of 0.1 if random.random() > 0.1: # return an empty list return [] input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

Registrazione personalizzata per lo script di pre-elaborazione

Se lo script di preelaborazione restituisce un errore, controlla i messaggi di eccezione registrati in debug. CloudWatch È possibile accedere al logger tramite l'interfaccia. CloudWatch preprocess_handler Puoi registrare tutte le informazioni di cui hai bisogno dal tuo script a CloudWatch. Potrebbe essere utile per eseguire il debug dello script di pre-elaborazione. L'esempio seguente mostra come utilizzare l'preprocess_handlerinterfaccia per accedere a CloudWatch

def preprocess_handler(inference_record, logger): logger.info(f"I'm a processing record: {inference_record}") logger.debug(f"I'm debugging a processing record: {inference_record}") logger.warning(f"I'm processing record with missing value: {inference_record}") logger.error(f"I'm a processing record with bad value: {inference_record}") return inference_record

Script di post-elaborazione

Utilizza uno script di post-elaborazione quando desideri estendere il codice dopo un'esecuzione di monitoraggio riuscita.

def postprocess_handler(): print("Hello from post-proc script!")