本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
如果您使用的是实时终端节点,则可以使用 AWS CloudFormation 自定义资源来创建监控计划。自定义资源位于 Python 中。要部署它,请参阅 Python Lambda 部署。
自定义资源
首先向 AWS CloudFormation 模板添加自定义资源。这指向您下一步将创建的 AWS Lambda 函数。
此资源使您可以自定义监控计划的参数。您可以通过修改以下示例资源中的 AWS CloudFormation 资源和 Lambda 函数来添加或删除更多参数。
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"MonitoringSchedule": {
"Type": "Custom::MonitoringSchedule",
"Version": "1.0",
"Properties": {
"ServiceToken": "arn:aws:lambda:us-west-2:111111111111:function:lambda-name",
"ScheduleName": "YourScheduleName",
"EndpointName": "YourEndpointName",
"BaselineConstraintsUri": "s3://your-baseline-constraints/constraints.json",
"BaselineStatisticsUri": "s3://your-baseline-stats/statistics.json",
"PostAnalyticsProcessorSourceUri": "s3://your-post-processor/postprocessor.py",
"RecordPreprocessorSourceUri": "s3://your-preprocessor/preprocessor.py",
"InputLocalPath": "/opt/ml/processing/endpointdata",
"OutputLocalPath": "/opt/ml/processing/localpath",
"OutputS3URI": "s3://your-output-uri",
"ImageURI": "111111111111.dkr.ecr.us-west-2.amazonaws.com/your-image",
"ScheduleExpression": "cron(0 * ? * * *)",
"PassRoleArn": "arn:aws:iam::111111111111:role/AmazonSageMaker-ExecutionRole"
}
}
}
}
Lambda 自定义资源代码
此 AWS CloudFormation 自定义资源使用自定义资源助手pip install crhelper
pip 安装该库。
此 Lambda 函数由 AWS CloudFormation 在创建和删除堆栈期间调用。此 Lambda 函数负责创建和删除监控计划,并使用上一部分中描述的自定义资源中定义的参数。
import boto3
import botocore
import logging
from crhelper import CfnResource
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
sm = boto3.client('sagemaker')
# cfnhelper makes it easier to implement a CloudFormation custom resource
helper = CfnResource()
# CFN Handlers
def handler(event, context):
helper(event, context)
@helper.create
def create_handler(event, context):
"""
Called when CloudFormation custom resource sends the create event
"""
create_monitoring_schedule(event)
@helper.delete
def delete_handler(event, context):
"""
Called when CloudFormation custom resource sends the delete event
"""
schedule_name = get_schedule_name(event)
delete_monitoring_schedule(schedule_name)
@helper.poll_create
def poll_create(event, context):
"""
Return true if the resource has been created and false otherwise so
CloudFormation polls again.
"""
schedule_name = get_schedule_name(event)
logger.info('Polling for creation of schedule: %s', schedule_name)
return is_schedule_ready(schedule_name)
@helper.update
def noop():
"""
Not currently implemented but crhelper will throw an error if it isn't added
"""
pass
# Helper Functions
def get_schedule_name(event):
return event['ResourceProperties']['ScheduleName']
def create_monitoring_schedule(event):
schedule_name = get_schedule_name(event)
monitoring_schedule_config = create_monitoring_schedule_config(event)
logger.info('Creating monitoring schedule with name: %s', schedule_name)
sm.create_monitoring_schedule(
MonitoringScheduleName=schedule_name,
MonitoringScheduleConfig=monitoring_schedule_config)
def is_schedule_ready(schedule_name):
is_ready = False
schedule = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
status = schedule['MonitoringScheduleStatus']
if status == 'Scheduled':
logger.info('Monitoring schedule (%s) is ready', schedule_name)
is_ready = True
elif status == 'Pending':
logger.info('Monitoring schedule (%s) still creating, waiting and polling again...', schedule_name)
else:
raise Exception('Monitoring schedule ({}) has unexpected status: {}'.format(schedule_name, status))
return is_ready
def create_monitoring_schedule_config(event):
props = event['ResourceProperties']
return {
"ScheduleConfig": {
"ScheduleExpression": props["ScheduleExpression"],
},
"MonitoringJobDefinition": {
"BaselineConfig": {
"ConstraintsResource": {
"S3Uri": props['BaselineConstraintsUri'],
},
"StatisticsResource": {
"S3Uri": props['BaselineStatisticsUri'],
}
},
"MonitoringInputs": [
{
"EndpointInput": {
"EndpointName": props["EndpointName"],
"LocalPath": props["InputLocalPath"],
}
}
],
"MonitoringOutputConfig": {
"MonitoringOutputs": [
{
"S3Output": {
"S3Uri": props["OutputS3URI"],
"LocalPath": props["OutputLocalPath"],
}
}
],
},
"MonitoringResources": {
"ClusterConfig": {
"InstanceCount": 1,
"InstanceType": "ml.t3.medium",
"VolumeSizeInGB": 50,
}
},
"MonitoringAppSpecification": {
"ImageUri": props["ImageURI"],
"RecordPreprocessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
"PostAnalyticsProcessorSourceUri": props['PostAnalyticsProcessorSourceUri'],
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 300
},
"RoleArn": props["PassRoleArn"],
}
}
def delete_monitoring_schedule(schedule_name):
logger.info('Deleting schedule: %s', schedule_name)
try:
sm.delete_monitoring_schedule(MonitoringScheduleName=schedule_name)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFound':
logger.info('Resource not found, nothing to delete')
else:
logger.error('Unexpected error while trying to delete monitoring schedule')
raise e