Vorverarbeitung und Nachbearbeitung - Amazon SageMaker

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Vorverarbeitung und Nachbearbeitung

Sie können benutzerdefinierte Python-Skripte für die Vor- und Nachverarbeitung verwenden, um die Eingabe in Ihren Modellmonitor zu transformieren oder den Code nach einem erfolgreichen Überwachungslauf zu erweitern. Laden Sie diese Skripts auf Amazon S3 hoch und referenzieren Sie sie, wenn Sie Ihren Modellmonitor erstellen.

Das folgende Beispiel zeigt, wie Sie Überwachungspläne mit Vor- und Nachverarbeitungsskripten anpassen können. Ersetzen user placeholder text mit Ihren eigenen Informationen.

import boto3, os from sagemaker import get_execution_role, Session from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor # Upload pre and postprocessor scripts session = Session() bucket = boto3.Session().resource("s3").Bucket(session.default_bucket()) prefix = "demo-sagemaker-model-monitor" pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor.py")).upload_file("preprocessor.py") post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor.py")).upload_file("postprocessor.py") # Get execution role role = get_execution_role() # can be an empty string # Instance type instance_type = "instance-type" # instance_type = "ml.m5.xlarge" # Example # Create a monitoring schedule with pre and postprocessing my_default_monitor = DefaultModelMonitor( role=role, instance_count=1, instance_type=instance_type, volume_size_in_gb=20, max_runtime_in_seconds=3600, ) s3_report_path = "s3://{}/{}".format(bucket, "reports") monitor_schedule_name = "monitor-schedule-name" endpoint_name = "endpoint-name" my_default_monitor.create_monitoring_schedule( post_analytics_processor_script=post_processor_script, record_preprocessor_script=pre_processor_script, monitor_schedule_name=monitor_schedule_name, # use endpoint_input for real-time endpoint endpoint_input=endpoint_name, # or use batch_transform_input for batch transform jobs # batch_transform_input=batch_transform_name, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )

Vorverarbeitungsskript

Verwenden Sie Vorverarbeitungsskripten, wenn Sie die Eingaben für Ihren Modellmonitor transformieren müssen.

Nehmen wir beispielsweise an, die Ausgabe Ihres Modells ist ein Array [1.0, 2.1]. Der Amazon SageMaker Model Monitor-Container funktioniert nur mit tabellarischen oder abgeflachten JSON Strukturen wie. {prediction0”: 1.0, “prediction1” : 2.1} Sie könnten ein Vorverarbeitungsskript wie das folgende verwenden, um das Array in die richtige Struktur umzuwandeln. JSON

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }

Nehmen wir in einem anderen Beispiel an, dass Ihr Modell optionale Funktionen hat und -1 angeben, dass das optionale Feature einen fehlenden Wert hat. Wenn Sie über einen Datenqualitätsmonitor verfügen, sollten Sie den -1 aus dem Eingabe-Werte-Array entfernen, damit er nicht in den metrischen Berechnungen des Monitors berücksichtigt wird. Sie könnten ein Skript wie das folgende verwenden, um diese Werte zu entfernen.

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

Ihr Vorverarbeitungsskript erhält inference_record als einzige Eingabe eine. Der folgende Codeschnipsel zeigt ein Beispiel für ein inference_record.

{ "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9", "inferenceTime": "2019-11-20T23:33:12Z" }, "eventVersion": "0" }

Der folgende Codeschnipsel zeigt die vollständige Klassenstruktur für eine inference_record.

KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_GROUND_TRUTH_DATA = "groundTruthData" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput" KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput" KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.batch_transform_output = None self.ground_truth = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT]) if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT]) if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]: self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT]) if KEY_GROUND_TRUTH_DATA in event_dict: self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_ENDPOINT_OUTPUT ] = self.endpoint_output.as_dict() if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict() self._event_dict_postprocessed = True return self.event_dict def __str__(self): return str(self.as_dict())

Benutzerdefinierte Probenahme

Sie können in Ihrem Vorverarbeitungsskript auch eine benutzerdefinierte Sampling-Strategie anwenden. Konfigurieren Sie dazu den vorgefertigten Container von Model Monitor aus erster Hand so, dass ein bestimmter Prozentsatz der Datensätze entsprechend der von Ihnen angegebenen Sampling-Rate ignoriert wird. Im folgenden Beispiel nimmt der Handler eine Stichprobe von 10 Prozent der Datensätze vor, indem er den Datensatz bei 10 Prozent der Handler-Aufrufe zurückgibt und andernfalls eine leere Liste ausgibt.

import random def preprocess_handler(inference_record): # we set up a sampling rate of 0.1 if random.random() > 0.1: # return an empty list return [] input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

Benutzerdefiniertes Logging für das Vorverarbeitungsskript

Wenn Ihr Vorverarbeitungsskript einen Fehler zurückgibt, überprüfen Sie zum CloudWatch Debuggen die protokollierten Ausnahmemeldungen. Sie können CloudWatch über die Schnittstelle auf den Logger zugreifen. preprocess_handler Sie können alle Informationen, die Sie benötigen, aus Ihrem Skript protokollieren CloudWatch. Dies kann beim Debuggen Ihres Vorverarbeitungsskripts nützlich sein. Das folgende Beispiel zeigt, wie Sie die preprocess_handler Schnittstelle verwenden können, um sich anzumelden CloudWatch

def preprocess_handler(inference_record, logger): logger.info(f"I'm a processing record: {inference_record}") logger.debug(f"I'm debugging a processing record: {inference_record}") logger.warning(f"I'm processing record with missing value: {inference_record}") logger.error(f"I'm a processing record with bad value: {inference_record}") return inference_record

Nachbearbeitungsskript

Verwenden Sie ein Postprocessing-Skript, wenn Sie den Code nach einem erfolgreichen Überwachungslauf erweitern möchten.

def postprocess_handler(): print("Hello from post-proc script!")