前処理と後処理 - Amazon SageMaker AI

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

前処理と後処理

カスタムの前処理と後処理の Python スクリプトを使用して、入力をモデルモニターに変換したり、モニタリングが正常に実行された後にコードを拡張したりできます。これらのスクリプトを Amazon S3 にアップロードし、モデルモニターを作成するときに参照します。

次の例は、前処理スクリプトと後処理スクリプトでモニタリングスケジュールをカスタマイズする方法を示しています。ユーザープレースホルダーテキストを自分の情報に置き換えます。

import boto3, os from sagemaker import get_execution_role, Session from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor # Upload pre and postprocessor scripts session = Session() bucket = boto3.Session().resource("s3").Bucket(session.default_bucket()) prefix = "demo-sagemaker-model-monitor" pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor.py")).upload_file("preprocessor.py") post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor.py")).upload_file("postprocessor.py") # Get execution role role = get_execution_role() # can be an empty string # Instance type instance_type = "instance-type" # instance_type = "ml.m5.xlarge" # Example # Create a monitoring schedule with pre and postprocessing my_default_monitor = DefaultModelMonitor( role=role, instance_count=1, instance_type=instance_type, volume_size_in_gb=20, max_runtime_in_seconds=3600, ) s3_report_path = "s3://{}/{}".format(bucket, "reports") monitor_schedule_name = "monitor-schedule-name" endpoint_name = "endpoint-name" my_default_monitor.create_monitoring_schedule( post_analytics_processor_script=post_processor_script, record_preprocessor_script=pre_processor_script, monitor_schedule_name=monitor_schedule_name, # use endpoint_input for real-time endpoint endpoint_input=endpoint_name, # or use batch_transform_input for batch transform jobs # batch_transform_input=batch_transform_name, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )

前処理スクリプト

モデルモニターへの入力を変換する必要がある場合は、前処理スクリプトを使用します。

例えば、モデルの出力が配列 [1.0, 2.1] だとします。Amazon SageMaker Model Monitor コンテナは、表形式または {prediction0”: 1.0, “prediction1” : 2.1} のようにフラット化された JSON 構造でのみ動作します。次のような前処理スクリプトを使用して、配列を正しい JSON 構造に変換することもできます。

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }

別の例として、モデルにオプション機能があり、そのオプション機能に欠測値があることを示すために -1 を使用するとします。データ品質モニターを使用している場合は、モニターのメトリクス計算に含まれないように、入力値の配列から -1 を削除したほうがよい場合があります。次のようなスクリプトを使用して、これらの値を削除できます。

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

前処理スクリプトは inference_record を唯一の入力として受け取ります。次のコード スニペットは inference_record の例を示しています。

{ "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9", "inferenceTime": "2019-11-20T23:33:12Z" }, "eventVersion": "0" }

次のコードスニペットは inference_record の完全なクラス構造を示しています。

KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_GROUND_TRUTH_DATA = "groundTruthData" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput" KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput" KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.batch_transform_output = None self.ground_truth = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT]) if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT]) if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]: self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT]) if KEY_GROUND_TRUTH_DATA in event_dict: self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_ENDPOINT_OUTPUT ] = self.endpoint_output.as_dict() if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict() self._event_dict_postprocessed = True return self.event_dict def __str__(self): return str(self.as_dict())

カスタムサンプリング

前処理スクリプトにカスタムサンプリング戦略を適用することもできます。そのためには、Model Monitor のファーストパーティのビルド済みコンテナを設定して、指定したサンプリングレートに従って一部のレコードを無視するようにします。次の例では、ハンドラーはハンドラー呼び出しの 10% のレコードを返し、それ以外の場合は空のリストを返すことで、レコードの 10% をサンプリングします。

import random def preprocess_handler(inference_record): # we set up a sampling rate of 0.1 if random.random() > 0.1: # return an empty list return [] input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}

前処理スクリプト用のカスタムロギング

前処理スクリプトがエラーを返す場合は、CloudWatch に記録された例外メッセージを確認してデバッグします。preprocess_handler インターフェイスを通じて CloudWatch のロガーにアクセスできます。スクリプトから必要な情報をすべて CloudWatch に記録できます。これは前処理スクリプトをデバッグするときに役立ちます。次の例は、preprocess_handler インターフェイスを使用して CloudWatch にログを記録する方法を示しています。

def preprocess_handler(inference_record, logger): logger.info(f"I'm a processing record: {inference_record}") logger.debug(f"I'm debugging a processing record: {inference_record}") logger.warning(f"I'm processing record with missing value: {inference_record}") logger.error(f"I'm a processing record with bad value: {inference_record}") return inference_record

後処理スクリプト

モニタリングが正常に実行された後にコードを拡張したい場合は、後処理スクリプトを使用します。

def postprocess_handler(): print("Hello from post-proc script!")