Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Prétraitement et post-traitement
Vous pouvez utiliser des scripts Python de prétraitement et de post-traitement personnalisés pour transformer l'entrée de votre surveillance de modèle ou étendre le code après une exécution de surveillance réussie. Téléchargez ces scripts sur Amazon S3 et référencez-les lors de la création de votre surveillance de modèle.
L'exemple suivant montre comment personnaliser les planifications de surveillance à l'aide de scripts de prétraitement et de post-traitement. Remplacez user placeholder
text
avec vos propres informations.
import boto3, os from sagemaker import get_execution_role, Session from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor # Upload pre and postprocessor scripts session = Session() bucket = boto3.Session().resource("s3").Bucket(session.default_bucket()) prefix = "
demo-sagemaker-model-monitor
" pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor
.py")).upload_file("preprocessor
.py") post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor
.py")).upload_file("postprocessor
.py") # Get execution role role = get_execution_role() # can be an empty string # Instance type instance_type = "instance-type
" # instance_type = "ml.m5.xlarge" # Example # Create a monitoring schedule with pre and postprocessing my_default_monitor = DefaultModelMonitor( role=role, instance_count=1
, instance_type=instance_type, volume_size_in_gb=20
, max_runtime_in_seconds=3600
, ) s3_report_path = "s3://{}/{}".format(bucket, "reports
") monitor_schedule_name = "monitor-schedule-name
" endpoint_name = "endpoint-name
" my_default_monitor.create_monitoring_schedule( post_analytics_processor_script=post_processor_script, record_preprocessor_script=pre_processor_script, monitor_schedule_name=monitor_schedule_name, # use endpoint_input for real-time endpoint endpoint_input=endpoint_name, # or use batch_transform_input for batch transform jobs # batch_transform_input=batch_transform_name, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )
Script de prétraitement
Utilisez des scripts de prétraitement lorsque vous devez transformer les entrées de votre surveillance de modèle.
Supposons, par exemple, que la sortie de votre modèle soit un tableau [1.0,
2.1]
. Le conteneur Amazon SageMaker Model Monitor ne fonctionne qu'avec des JSON structures tabulaires ou aplaties, par exemple. {“
Vous pouvez utiliser un script de prétraitement tel que le suivant pour transformer le tableau dans la bonne JSON structure.prediction0
”: 1.0,
“prediction1
” : 2.1}
def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
Dans un autre exemple, supposons que votre modèle comporte des fonctions facultatives et que vous utilisiez -1
pour indiquer que la fonction facultative possède une valeur manquante. Si vous disposez d'une surveillance de qualité des données, vous pouvez le supprimer -1
du tableau des valeurs d'entrée afin qu'il ne soit pas inclus dans les calculs métriques de la surveillance. Vous pouvez utiliser un script comme celui-ci pour supprimer ces valeurs.
def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
Votre script de prétraitement reçoit inference_record
comme seule entrée. L'extrait de code suivant illustre un exemple de inference_record
.
{ "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "
132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1
", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287
", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9
", "inferenceTime": "2019-11-20T23:33:12Z
" }, "eventVersion": "0
" }
L'extrait de code suivant illustre la structure complète d'une classe pour inference_record
.
KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_GROUND_TRUTH_DATA = "groundTruthData" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput" KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput" KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.batch_transform_output = None self.ground_truth = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT]) if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT]) if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]: self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT]) if KEY_GROUND_TRUTH_DATA in event_dict: self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_ENDPOINT_OUTPUT ] = self.endpoint_output.as_dict() if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict() self._event_dict_postprocessed = True return self.event_dict def __str__(self): return str(self.as_dict())
Échantillonnage personnalisé
Vous pouvez également appliquer une stratégie d'échantillonnage personnalisée dans votre script de prétraitement. Pour ce faire, configurez le conteneur prédéfini de Model Monitor de manière à ignorer un pourcentage des enregistrements en fonction de la fréquence d'échantillonnage que vous avez spécifiée. Dans l'exemple suivant, le gestionnaire échantillonne 10 % des enregistrements en renvoyant l'enregistrement dans 10 % des appels du gestionnaire et en renvoyant une liste vide dans le cas contraire.
import random def preprocess_handler(inference_record): # we set up a sampling rate of 0.1 if random.random() > 0.1: # return an empty list return [] input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
Journalisation personnalisée pour le script de prétraitement
Si votre script de prétraitement renvoie une erreur, vérifiez les messages d'exception enregistrés CloudWatch pour le débogage. Vous pouvez accéder à l'enregistreur CloudWatch via l'preprocess_handler
interface. Vous pouvez enregistrer toutes les informations dont vous avez besoin depuis votre script dans CloudWatch. Cela peut être utile lors du débogage de votre script de prétraitement. L'exemple suivant montre comment vous pouvez utiliser l'preprocess_handler
interface pour vous connecter à CloudWatch
def preprocess_handler(inference_record, logger): logger.info(f"I'm a processing record: {inference_record}") logger.debug(f"I'm debugging a processing record: {inference_record}") logger.warning(f"I'm processing record with missing value: {inference_record}") logger.error(f"I'm a processing record with bad value: {inference_record}") return inference_record
Script de post-traitement
Utilisez un script de post-traitement lorsque vous souhaitez étendre le code après une exécution de surveillance réussie.
def postprocess_handler(): print("Hello from post-proc script!")