사전 처리 및 사후 처리 - Amazon SageMaker

사전 처리 및 사후 처리

사용자 지정 사전 처리 및 사후 처리 Python 스크립트를 사용하여 모델 모니터에 대한 입력을 변환하거나 모니터링이 성공적으로 실행된 후 코드를 확장할 수 있습니다. 이 스크립트를 Amazon S3에 업로드한 다음 작업할 모델 모니터를 생성할 때 이를 참조하세요.

다음 예제는 사전 처리 스크립트와 사후 처리 스크립트를 사용하여 모니터링 일정을 사용자 지정하는 방법을 보여줍니다. user placeholder text를 사용자의 정보로 바꿉니다.

import boto3, os from sagemaker import get_execution_role, Session from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor # Upload pre and postprocessor scripts session = Session() bucket = boto3.Session().resource("s3").Bucket(session.default_bucket()) prefix = "demo-sagemaker-model-monitor" pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor.py")).upload_file("preprocessor.py") post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor.py")).upload_file("postprocessor.py") # Get execution role role = get_execution_role() # can be an empty string # Instance type instance_type = "instance-type" # instance_type = "ml.m5.xlarge" # Example # Create a monitoring schedule with pre and postprocessing my_default_monitor = DefaultModelMonitor( role=role, instance_count=1, instance_type=instance_type, volume_size_in_gb=20, max_runtime_in_seconds=3600, ) s3_report_path = "s3://{}/{}".format(bucket, "reports") monitor_schedule_name = "monitor-schedule-name" endpoint_name = "endpoint-name" my_default_monitor.create_monitoring_schedule( post_analytics_processor_script=post_processor_script, record_preprocessor_script=pre_processor_script, monitor_schedule_name=monitor_schedule_name, # use endpoint_input for real-time endpoint endpoint_input=endpoint_name, # or use batch_transform_input for batch transform jobs # batch_transform_input=batch_transform_name, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )

사전 처리 스크립트

모델 모니터에 대한 입력을 변환해야 하는 경우 사전 처리 스크립트를 사용하세요.

예를 들어, 모델의 출력이 [1.0, 2.1]배열이라고 가정해 보겠습니다. Amazon SageMaker Model Monitor 컨테이너는 {prediction0”: 1.0, “prediction1” : 2.1}와 같은 형태로 테이블 형식 또는 평면화된 JSON 구조에서만 작동합니다. 해당 배열은 다음과 같은 사전 처리 스크립트를 사용하여 올바른 JSON 구조로 변환이 가능합니다.

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }

또 다른 예로, 작업 중인 모델에 선택사항 특징이 있는데, 해당 선택사항 특징에 누락된 값이 있음을 나타내기 위해 -1을 사용했다고 가정해 보겠습니다. 데이터 품질 모니터가 있는 경우, 입력 값 배열에서 -1을 제거하여 모니터의 지표 계산에 포함되지 않도록 하는 것이 좋습니다. 다음과 같은 스크립트를 사용하여 이러한 값을 제거할 수 있습니다.

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

사전 처리 스크립트는 inference_record를 유일한 입력으로 받아들입니다. 다음 코드 조각은 inference_record의 예제를 보여줍니다.

{ "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9", "inferenceTime": "2019-11-20T23:33:12Z" }, "eventVersion": "0" }

다음 코드 조각은 inference_record의 전체 클래스 구조를 보여줍니다.

KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_GROUND_TRUTH_DATA = "groundTruthData" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput" KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput" KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.batch_transform_output = None self.ground_truth = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT]) if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT]) if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]: self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT]) if KEY_GROUND_TRUTH_DATA in event_dict: self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_ENDPOINT_OUTPUT ] = self.endpoint_output.as_dict() if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict() self._event_dict_postprocessed = True return self.event_dict def __str__(self): return str(self.as_dict())

사용자 지정 샘플링

사전 처리 스크립트에 사용자 지정 샘플링 전략을 적용할 수도 있습니다. 이렇게 하려면, Model Monitor의 사전 구축된 자사 컨테이너를 구성하여 사용자가 지정한 샘플링 속도에 따라 해당 레코드의 백분율을 무시하도록 하세요. 다음 예제에서 핸들러는 핸들러 호출의 10%에서 레코드를 반환하고 나머지에 대해서는 빈 목록을 반환함으로써 레코드의 10%를 샘플링합니다.

import random def preprocess_handler(inference_record): # we set up a sampling rate of 0.1 if random.random() > 0.1: # return an empty list return [] input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

사전 처리 스크립트에 대한 사용자 지정 로깅

사전 처리 스크립트에서 오류가 반환되는 경우 CloudWatch에 기록된 예외 메시지를 확인하여 디버깅하세요. 사용자는 preprocess_handler인터페이스를 통해 CloudWatch의 로거에 액세스할 수 있습니다. 해당 스크립트에서 필요한 모든 정보를 CloudWatch에 기록할 수 있습니다. 이는 사전 처리 스크립트를 디버깅할 때 유용할 수 있습니다. 다음 예제는 CloudWatch로 기록하기 위해 preprocess_handler인터페이스를 사용하는 방법을 보여줍니다.

def preprocess_handler(inference_record, logger): logger.info(f"I'm a processing record: {inference_record}") logger.debug(f"I'm debugging a processing record: {inference_record}") logger.warning(f"I'm processing record with missing value: {inference_record}") logger.error(f"I'm a processing record with bad value: {inference_record}") return inference_record

사후 처리 스크립트

모니터링을 성공적으로 실행한 후 코드를 확장하려면 사후 처리 스크립트를 사용하세요.

def postprocess_handler(): print("Hello from post-proc script!")