增加一个步骤
下文介绍了每种步骤类型的要求,并提供了步骤的实现示例,以及如何将步骤添加到 Pipelines 中。这些都不是可行的实施方案,因为它们没有提供所需的资源和投入。有关实施这些步骤的教程,请参阅Pipelines 操作。
注意
您还可以使用 @step
装饰器将本地机器学习代码转换为 Pipelines 步骤,从而从本地机器学习代码中创建步骤。有关更多信息,请参阅 @step decorator。
Amazon SageMaker Pipelines 支持以下步骤类型:
@step decorator
如果您想在拖放式 Pipelines UI 中编排利用高级 SageMaker 功能或其他 AWS 服务的自定义 ML 作业,请使用 执行代码步骤。
您可以使用 @step
装饰器从本地机器学习代码中创建一个步骤。测试代码后,您可以使用 @step
装饰器注释该函数,将其转换为 SageMaker 管道步骤。当您将 @step
装饰功能的输出作为一个步骤传递给管道时,Pipelines 会创建并运行管道。您还可以创建一个多步骤 DAG 管道,其中包括一个或多个 @step
装饰函数以及传统的 SageMaker 管道步骤。有关如何使用 @step
装饰器创建步骤的更多详情,请参阅 使用 @step 装饰器升降 Python 代码。
在 Pipelines 拖放用户界面中,您可以使用执行代码步骤将自己的代码作为管道步骤运行。您可以上传一个 Python 函数、脚本或笔记本,作为管道的一部分执行。如果您想编排利用高级 SageMaker 功能或其他 AWS 服务的自定义 ML 作业,则应使用此步骤。
执行代码步骤将文件上传到 Amazon SageMaker 的默认 Amazon S3 存储桶。该存储桶可能未设置所需的跨源资源共享 (CORS) 权限。要了解有关配置 CORS 权限的更多信息,请参阅 输入映像数据的 CORS 要求。
执行代码步骤使用 Amazon SageMaker 训练作业来运行代码。确保您的 IAM 角色具有 sagemaker:DescribeTrainingJob
和 sagemaker:CreateTrainingJob
API 权限。要进一步了解 Amazon SageMaker 所需的所有权限以及如何设置这些权限,请参阅 Amazon SageMaker API 权限:操作、权限和资源参考。
要使用管道设计器向管道添加执行代码步骤,请执行以下操作:
-
按照 启动 Amazon SageMaker Studio 中的说明打开 Amazon SageMaker Studio 管理控制台。
-
在左侧导航窗格中,选择 Pipelines。
-
选择创建。
-
选择空白。
-
在左侧边栏中选择执行代码,然后将其拖到画布上。
-
在画布中,选择添加的执行代码步骤。
-
在右侧边栏中,填写设置和详情标签中的表格。
-
您可以上传单个文件来执行,也可以上传包含多个构件的压缩文件夹。
-
对于单个文件上传,您可以为笔记本、python 函数或脚本提供可选参数。
-
在提供 Python 函数时,必须以
file.py:
格式提供处理程序。<function_name>
-
对于上传压缩文件夹,必须提供代码的相对路径,您还可以选择提供压缩文件夹内
requirements.txt
文件或初始化脚本的路径。 -
如果画布上有任何步骤紧接在您添加的执行代码步骤之前,请单击并拖动光标从该步骤到执行代码步骤,以创建边缘。
-
如果画布中包含任何紧接您添加的执行代码步骤的步骤,请单击并拖动光标从执行代码步骤到该步骤,以创建边缘。可以引用执行代码步骤的输出作为 Python 函数。
使用处理步骤创建用于数据处理的处理作业。有关处理作业的更多信息,请参阅处理数据和评估模型。
您可以使用训练步骤创建训练作业来训练模型。有关训练作业的更多信息,请参阅使用 Amazon SageMaker 训练模型。
训练步骤需要估算器以及训练和验证数据输入。
您可以使用优化步骤来创建超参数优化作业,也称为超参数优化 (HPO)。超参数调整作业运行多个训练作业,每个作业产生一个模型版本。有关超参数优化的更多信息,请参阅使用 SageMaker 自动调整模型。
优化作业与管道的 SageMaker 实验相关联,而训练作业作为试验创建。有关更多信息,请参阅 实验集成。
优化步骤需要 HyperparameterTunerHyperparameterTuner
的 warm_start_config
参数来再训练以前的优化作业。有关超参数优化和温启动的更多信息,请参阅运行热启动超参数调优作业。
您可以使用 sagemaker.workflow.steps.TuningStep
重要
Amazon SageMaker Python SDK v2.48.0 和 Amazon SageMaker Studio Classic v3.8.0 中引入了调整步骤。使用调整步骤前必须更新 Studio Classic,否则不会显示管道 DAG。要更新 Studio Classic,请参阅 关闭并更新 SageMaker Studio Classic。
以下示例说明如何创建 TuningStep
定义。
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://
amzn-s3-demo-bucket/my-data
")) )
获取最佳模型版本
以下示例说明如何使用 get_top_model_s3_uri
方法从优化作业中获取最佳模型版本。根据 HyperParameterTuningJobObjective 排名,最多提供前 50 个性能卓越的版本。top_k
参数是版本的索引,其中 top_k=0
是性能最好的版本,top_k=49
是性能最差的版本。
best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )
有关优化步骤要求的更多信息,请参阅 sagemaker.workflow.steps.TuningStep
微调功能可在新数据集上训练来自 Amazon SageMaker JumpStart 的预训练基础模型。这个过程也称为转移学习,可以使用较小数据集和较短的训练时间生成准确模型。对模型进行微调时,可以使用默认数据集,也可以选择自己的数据。要了解有关从 JumpStart 微调基础模型的更多信息,请参阅 微调模型。
微调步骤使用 Amazon SageMaker 训练作业来定制模型。确保您的 IAM 角色具有 sagemaker:DescribeTrainingJob
和 sagemaker:CreateTrainingJob
API 权限,以便在管道中执行微调作业。API 权限,以便在管道中执行微调作业。要了解有关 Amazon SageMaker 所需权限以及如何设置权限的更多信息,请参阅 Amazon SageMaker API 权限:操作、权限和资源参考。
要使用拖放编辑器在管道中添加微调模型步骤,请按照以下步骤操作:
-
按照 启动 Amazon SageMaker Studio 中的说明打开 Studio 管理控制台。
-
在左侧导航窗格中,选择 Pipelines。
-
选择创建。
-
选择空白。
-
在左侧边栏中选择微调模型,然后将其拖动到画布上。
-
在画布中,选择添加的微调模型步骤。
-
在右侧边栏中,填写设置和详情标签中的表格。
-
如果画布上有任何步骤紧接在您添加的微调模型步骤之前,请单击并拖动光标从该步骤到微调模型步骤,以创建边缘。
-
如果画布上有任何步骤紧接着您添加的微调模型步骤,请单击并拖动光标从微调模型步骤到该步骤,以创建边缘。
使用 AutoML
注意
目前,AutoML 步骤仅支持组合训练模式。
以下示例说明如何使用 AutoMLStep
创建定义。
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="
<role>
", target_attribute_name="my_target_attribute_name
", mode="ENSEMBLING
", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://amzn-s3-demo-bucket/my-training-data
", target_attribute_name="my_target_attribute_name
", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://amzn-s3-demo-bucket/my-validation-data
", target_attribute_name="my_target_attribute_name
", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )
获取最佳模型版本
AutoML 步骤会自动训练多个候选模型。使用 get_best_auto_ml_model
方法从 AutoML 作业中获取目标指标最佳的模型,如下所示。您还必须使用 IAM role
访问模型构件。
best_model = step_automl.get_best_auto_ml_model(
role=<role>
)
有关更多信息,请参阅 SageMaker Python SDK 中的 AutoML
使用 ModelStep
创建或注册 SageMaker 模型。有关 ModelStep
要求的更多信息,请参阅 sagemaker.workflow.model_step.ModelStep
创建模型
您可以使用 ModelStep
创建 SageMaker 模型。ModelStep
需要模型构件和有关 SageMaker 实例类型的信息,您需要使用这些信息来创建模型。有关 SageMaker 模型的更多信息,请参阅使用 Amazon SageMaker 训练一个模型。
以下示例说明如何创建 ModelStep
定义。
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )
注册模型
您可以通过 Amazon SageMaker 模型注册表,使用 ModelStep
注册 sagemaker.model.Model
或 sagemaker.pipeline.PipelineModel
。PipelineModel
表示推理管道,是一个由处理推理请求的容器线性序列组成的模型。有关如何注册模型的更多信息,请参阅利用模型注册中心进行模型注册部署。
以下示例说明如何创建一个注册 PipelineModel
的 ModelStep
。
import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(
bucket_name
, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )
您使用创建模型步骤来创建 SageMaker 模型。有关 SageMaker 模型的更多信息,请参阅 使用 Amazon SageMaker 训练模型。
创建模型步骤需要模型构件和有关 SageMaker 实例类型的信息,您需要使用这些信息来创建模型。下面的示例展示了如何创建创建模型步骤定义。有关创建模型步骤要求的更多信息,请参阅 sagemaker.workflow.steps.CreateModelStep
注册模型步骤将模型注册到 SageMaker 模型注册中心。
在管道设计器中,使用部署模型(端点)步骤将模型部署到端点。您可以创建一个新端点或使用现有端点。实时推理非常适合有实时、交互式、低延迟要求的推理工作负载。您可以将模型部署到 SageMaker 托管服务,并获得可用于推理的实时端点。这些端点可完全托管,并支持自动扩缩。要了解有关 SageMaker 实时推理的更多信息,请参阅 实时推理。
在管道中添加部署模型步骤之前,请确保您的 IAM 角色具有以下权限:
-
sagemaker:CreateModel
-
sagemaker:CreateEndpointConfig
-
sagemaker:CreateEndpoint
-
sagemaker:UpdateEndpoint
-
sagemaker:DescribeModel
-
sagemaker:DescribeEndpointConfig
-
sagemaker:DescribeEndpoint
要进一步了解 SageMaker 的所有必要权限以及如何设置这些权限,请参阅 Amazon SageMaker API 权限:操作、权限和资源参考。
要在拖放编辑器中为管道添加模型部署步骤,请完成以下步骤:
-
按照 启动 Amazon SageMaker Studio 中的说明打开 Studio 管理控制台。
-
在左侧导航窗格中,选择 Pipelines。
-
选择创建。
-
选择空白。
-
在左侧边栏中选择部署模型(端点),然后将其拖到画布上。
-
在画布中,选择添加的部署模型(端点)步骤。
-
在右侧边栏中,填写设置和详情标签中的表格。
-
如果画布上有任何步骤紧接在您添加的部署模型(端点)步骤之前,请单击并拖动光标,从该步骤到部署模型(端点)步骤,以创建边缘。
-
如果画布中包含任何紧接您添加的部署模型(端点)步骤的步骤,请单击并将光标从部署模型(端点)步骤拖动到该步骤,以创建边缘。
您可以使用转换步骤进行批量转换,在整个数据集上运行推理。有关批量转换的更多信息,请参阅利用推理管道进行批量转换。
转换步骤需要转换器和用于运行批量转换的数据。下面的示例显示了如何创建变换步骤定义。有关转换步骤要求的更多信息,请参阅 sagemaker.workflow.steps.TransformStep
您可以使用条件步骤来评估步骤属性的条件,以评估管道中下一步应该采取的操作。
需要一个条件步骤:
-
一个条件列表。
-
如果条件评估结果为
true
,则要运行的步骤列表。 -
如果条件评估结果为
false
,则要运行的步骤列表。
使用 Callback
步骤在工作流程中添加 Amazon SageMaker Pipelines 未直接提供的其他流程和 AWS 服务。当 Callback
步骤运行时,会发生以下过程:
-
Pipelines 向客户指定的 Amazon Simple Queue Service (Amazon SQS) 队列发送消息。该信息包含一个 Pipelines 生成的标记和客户提供的输入参数列表。发送信息后,Pipelines 会等待客户的回复。
-
客户从 Amazon SQS 队列中检索消息并启动他们的自定义流程。
-
流程结束后,客户会调用以下其中一个应用程序接口,并提交 Pipelines 生成的令牌:
-
SendPipelineExecutionStepSuccess,以及输出参数列表
-
-
API 调用会导致 Pipelines 继续执行管道流程或使流程失败。
有关 Callback
步骤要求的更多信息,请参阅 sagemaker.workflow.callback_step.CallbackStep
重要
在 Amazon SageMaker Python SDK v2.45.0 和 Amazon SageMaker Studio Classic v3.6.2 中引入了 Callback
步骤。在使用 Callback
步骤之前,您必须更新 Studio Classic,否则不会显示管道 DAG。要更新 Studio Classic,请参阅 关闭并更新 SageMaker Studio Classic。
下面的示例展示了上述程序的执行过程。
from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
注意
不得嵌套 CallbackStep
的输出参数。例如,如果您使用嵌套字典作为输出参数,则该字典将被视为单个字符串(例如 {"output1": "{\"nested_output1\":\"my-output\"}"}
)。如果您提供嵌套值,那么当您尝试引用某个输出参数时,SageMaker 会抛出一个不可重试的客户端错误。
停止行为
当 Callback
步骤运行时,管道进程不会停止。
在有正在运行的 Callback
步骤的管道进程上调用 StopPipelineExecution 时,Pipelines 会向 SQS 队列发送一条 Amazon SQS 消息。SQS 消息正文包含一个状态字段,该字段设置为 Stopping
。以下是 SQS 消息正文示例。
{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }
您应该在 Amazon SQS 消息用户中添加逻辑,以便在收到消息后采取任何必要的措施(例如,资源清理)。然后添加对 SendPipelineExecutionStepSuccess
或 SendPipelineExecutionStepFailure
的调用。
只有当 Pipelines 收到这些调用时,它才会停止管道进程。
您可以使用 Lambda 步骤来运行 AWS Lambda 函数。您可以运行现有 Lambda 函数,或者 SageMaker 可以创建并运行一个新的 Lambda 函数。有关展示如何在 SageMaker 管道中使用 Lambda 步骤的笔记本,请参阅 sagemaker-pipelines-lambda-step.ipynb
重要
Amazon SageMaker Python SDK v2.51.0 和 Amazon SageMaker Studio Classic v3.9.1 中引入了 Lambda 步骤。您必须在使用 Lambda 步骤之前更新 Studio Classic,否则不会显示管道 DAG。要更新 Studio Classic,请参阅 关闭并更新 SageMaker Studio Classic。
SageMaker 提供了 sagemaker.lambda_helper.LambdaLambda
具有以下签名。
Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )
sagemaker.workflow.lambda_step.LambdaSteplambda_func
参数类型为 Lambda
。要调用现有 Lambda 函数,唯一的要求是将函数的 Amazon 资源名称 (ARN) 提供给 function_arn
。如果不提供 function_arn
的值,则必须指定 handler
和以下内容之一:
-
zipped_code_dir
- 压缩后的 Lambda 函数的路径s3_bucket
- 要上传zipped_code_dir
的 Amazon S3 存储桶 -
script
- Lambda 函数脚本文件的路径
以下示例说明如何创建调用现有 Lambda 函数的 Lambda
步骤定义。
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
以下示例说明如何创建 Lambda
步骤定义,该定义使用 Lambda 函数脚本创建和调用 Lambda 函数。
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
输入和输出
如果您的 Lambda
函数有输入或输出,则还必须在 Lambda
步骤中定义这些输入或输出。
注意
不得嵌套输入和输出参数。例如,如果您使用嵌套字典作为输出参数,则该字典将被视为单个字符串(例如 {"output1": "{\"nested_output1\":\"my-output\"}"}
)。如果您提供嵌套值并尝试稍后再引用该值,则会引发不可重试的客户端错误。
定义 Lambda
步骤时,inputs
必须是键值对的字典。inputs
字典的每个值都必须是基元类型(字符串、整数或浮点数)。不支持嵌套对象。如果 inputs
值未定义,则默认为 None
。
outputs
值必须是键列表。这些键是指 Lambda
函数输出中定义的字典。比如 inputs
,这些键必须是基元类型,并且不支持嵌套对象。
超时和停止行为
Lambda
类有一个 timeout
参数,用于指定 Lambda 函数可以运行的最长时间。默认值为 120 秒,最大值为 10 分钟。如果在达到超时条件时 Lambda 函数正在运行,则 Lambda 步骤将失败;但 Lambda 函数会继续运行。
当 Lambda 步骤正在运行时,无法停止管道进程,因为无法停止 Lambda 步骤调用的 Lambda 函数。如果您在 Lambda 函数运行时停止进程,管道会等待函数完成或超时。这取决于哪个先发生。进程随即停止。如果 Lambda 函数完成,则管道进程状态为 Stopped
。如果达到超时,则管道进程状态为 Failed
。
您可以使用 ClarifyCheck
步骤对照先前的基准进行基准偏差检查,以进行偏差分析和实现模型可解释性。然后,您可以使用 model.register()
方法生成和注册基准,并使用 step_args
将该方法的输出传递给 模型步骤。Amazon SageMaker Model Monitor 可将这些漂移检查基线用于模型端点。因此,您无需单独提出基线建议。
ClarifyCheck
步骤还可以从模型注册表中提取偏差检查基准。ClarifyCheck
步骤使用 SageMaker Clarify 预置容器。该容器提供一系列模型监测功能,包括约束建议和根据给定基线进行约束验证。有关更多信息,请参阅 预构建 SageMaker Clarify 容器。
配置 ClarifyCheck 步骤
您可以将 ClarifyCheck
步骤配置为每次在管道中使用时仅执行以下一种类型的检查。
-
数据偏差检查
-
模型偏差检查
-
模型可解释性检查
为此,请将 clarify_check_config
参数设置为以下检查类型值之一:
-
DataBiasCheckConfig
-
ModelBiasCheckConfig
-
ModelExplainabilityCheckConfig
ClarifyCheck
步骤会启动一个处理作业,该作业会运行 SageMaker Clarify 预构建容器,并需要为检查和处理作业设置专用的配置。ClarifyCheckConfig
和 CheckJobConfig
是这些配置的辅助函数。这些辅助功能与 SageMaker Clarify 处理任务在检查模型偏差、数据偏差或模型可解释性时的计算方法一致。有关更多信息,请参阅 运行 SageMaker Clarify 处理作业以进行偏差分析和实现可解释性。
控制偏差检查的步骤行为
ClarifyCheck
步骤需要以下两个布尔标志来控制其行为:
-
skip_check
:此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为False
,则配置的检查类型的先前基准必须可用。 -
register_new_baseline
:此参数表示是否可以通过步骤属性BaselineUsedForDriftCheckConstraints
访问新计算的基准。如果将其设置为False
,则配置的检查类型的先前基准也必须可用。这可以通过BaselineUsedForDriftCheckConstraints
属性访问。
有关更多信息,请参阅 在 Amazon SageMaker Pipelines 中使用 ClarifyCheck 和 QualityCheck 步骤进行基线计算、漂移检测和生命周期检查。
使用基准
您可以选择指定 model_package_group_name
来定位现有基线。然后,ClarifyCheck
步骤在模型软件包组中最新批准的模型软件包上提取 DriftCheckBaselines
。
或者,您可以通过 supplied_baseline_constraints
参数提供先前的基准。如果同时指定 model_package_group_name
和 supplied_baseline_constraints
,则 ClarifyCheck
步骤将使用 supplied_baseline_constraints
参数指定的基准。
有关使用 ClarifyCheck
步骤要求的更多信息,请参阅 Amazon SageMaker SDK for Python 中的 sagemaker.workflow.steps.ClarifyCheckStepClarifyCheck
步骤,请参阅 sagemaker-pipeline-model-monitor-clarify-steps.ipynb
例 创建 ClarifyCheck
步骤以进行数据偏差检查
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="
ml.c5.xlarge
", volume_size_in_gb=120
, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep
']), label=0, dataset_type="text/csv
", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0
], facet_name=[8
], facet_values_or_threshold=[[0.5
]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep
", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json
", model_package_group_name="MyModelPackageGroup
" )
使用 QualityCheck
步骤对管道中的数据质量或模型质量进行基线建议和漂移检查。然后,您可以使用 model.register()
方法生成和注册基准,并使用 step_args
将该方法的输出传递给 模型步骤。
Model Monitor 可以将这些偏差检查基准用于模型端点,这样您就无需单独提出基准建议。QualityCheck
步骤还可以从模型注册表中提取偏差检查基准。QualityCheck
步骤利用 Amazon SageMaker Model Monitor 预置容器。该容器具有一系列模型监测功能,包括约束建议、统计数据生成和对照基线进行约束验证。有关更多信息,请参阅 Amazon SageMaker Model Monitor 预构建容器。
配置 QualityCheck 步骤
您可以对 QualityCheck
步骤进行配置,使其每次在管道中使用时只运行以下一种检查类型。
-
数据质量检查
-
模型质量检查
您可以通过将 quality_check_config
参数设置为以下检查类型值之一来实现此目的:
-
DataQualityCheckConfig
-
ModelQualityCheckConfig
QualityCheck
步骤会启动一个处理作业,该作业运行 Model Monitor 预构建容器,并且需要专门的用于检查和处理作业的配置。QualityCheckConfig
和 CheckJobConfig
是这些配置的辅助功能。这些辅助功能与模型监控器为模型质量或数据质量监控创建基线的方式一致。有关 Model Monitor 基准建议的更多信息,请参阅创建基准和创建模型质量基线。
控制偏差检查的步骤行为
QualityCheck
步骤需要以下两个布尔标志来控制其行为:
-
skip_check
:此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为False
,则配置的检查类型的先前基准必须可用。 -
register_new_baseline
:此参数表示是否可以通过步骤属性BaselineUsedForDriftCheckConstraints
和BaselineUsedForDriftCheckStatistics
访问新计算的基准。如果将其设置为False
,则配置的检查类型的先前基准也必须可用。它们可以通过BaselineUsedForDriftCheckConstraints
和BaselineUsedForDriftCheckStatistics
属性进行访问。
有关更多信息,请参阅 在 Amazon SageMaker Pipelines 中使用 ClarifyCheck 和 QualityCheck 步骤进行基线计算、漂移检测和生命周期检查。
使用基准
您可以通过 supplied_baseline_statistics
和 supplied_baseline_constraints
参数直接指定上一条基线。您还可以指定 model_package_group_name
和 QualityCheck
步骤,在模型软件包组中最新批准的模型软件包上拉动 DriftCheckBaselines
。
指定以下内容时,QualityCheck
步骤将使用 supplied_baseline_constraints
和 supplied_baseline_statistics
在 QualityCheck
步骤的校验类型上指定的基线。
-
model_package_group_name
-
supplied_baseline_constraints
-
supplied_baseline_statistics
有关使用 QualityCheck
步骤要求的更多信息,请参阅 Amazon SageMaker SDK for Python 中的 sagemaker.workflow.steps.QualityCheckStepQualityCheck
步骤,请参阅 sagemaker-pipeline-model-monitor-clarify-steps.ipynb
例 创建 QualityCheck
步骤以进行数据质量检查
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="
ml.c5.xlarge
", volume_size_in_gb=120
, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train
"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json
", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json
", model_package_group_name="MyModelPackageGroup
" )
使用 Amazon SageMaker Pipelines EMR 步骤:
-
在运行中的 Amazon EMR 集群上处理 Amazon EMR 步骤。
-
让管道为您创建和管理 Amazon EMR 集群。
有关 Amazon EMR 的更多信息,请参阅 Amazon EMR 入门。
EMR 步骤要求 EMRStepConfig
包括 Amazon EMR 集群使用的 JAR 文件的位置以及要传递的任何参数。如果您要在运行中的 EMR 集群上运行该步骤,还需提供 Amazon EMR 集群 ID。您还可以通过集群配置,在它为您创建、管理和终止的集群上运行 EMR 步骤。以下几节包括演示这两种方法的示例笔记本的示例和链接。
注意
-
EMR 步骤要求传递给管道的角色具有其他权限。将 AWS 托管式策略附加到管道角色:
AmazonSageMakerPipelinesIntegrations
到管道角色,或确保角色包含该策略中的权限。 -
EMR Serverless 不支持 EMR 步骤。EKS 上的 Amazon EMR 也不支持该功能。
-
如果您在运行中的集群上处理 EMR 步骤,则只能使用处于以下状态之一的集群:
-
STARTING
-
BOOTSTRAPPING
-
RUNNING
-
WAITING
-
-
如果您在正在运行的集群上处理 EMR 步骤,则在 EMR 集群上最多可以有 256 个处于
PENDING
状态的 EMR 步骤。提交的 EMR 步骤超过此限制会导致管道执行失败。您可以考虑使用管道步骤的重试策略。 -
您可以指定集群 ID 或集群配置,但不能同时指定两者。
-
EMR 步骤依靠 Amazon EventBridge 来监控 EMR 步骤或集群状态的更改。如果您在正在运行的集群上处理 Amazon EMR 作业,则 EMR 步骤将使用
SageMakerPipelineExecutionEMRStepStatusUpdateRule
规则来监控 EMR 步骤的状态。如果您在 EMR 步骤创建的集群上处理作业,则该步骤会使用SageMakerPipelineExecutionEMRClusterStatusRule
规则监控集群状态的变化。如果您在 AWS 账户中看到这两项 EventBridge 规则中的任一项,请不要将其删除,否则 EMR 步骤可能无法完成。
在正在运行的 Amazon EMR 集群上启动新作业
要在运行中的 Amazon EMR 集群上启动新作业,请将集群 ID 作为字符串传递给 EMRStep
的 cluster_id
参数。以下示例演示了此过程。
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="
jar-location
", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )
有关指导您完成完整示例的示例笔记本,请参阅运行 EMR 集群的 Pipelines EMR 步骤
在新的 Amazon EMR 集群上启动新作业
要在 EMRStep
为您创建的新集群上启动新作业,请将集群配置作为字典提供。字典的结构必须与 RunJobFlow 请求相同。但是,请勿在集群配置中包含以下字段:
-
[
Name
] -
[
Steps
] -
[
AutoTerminationPolicy
] -
[
Instances
][KeepJobFlowAliveWhenNoSteps
] -
[
Instances
][TerminationProtected
]
所有其他 RunJobFlow
参数均可在您的集群配置中使用。有关请求语法的详细信息,请参阅 RunJobFlow。
下面的示例将集群配置传递给 EMR 步骤定义。这将提示在新的 EMR 集群上启动新作业的步骤。此示例中的 EMR 集群配置包括主节点和核心 EMR 集群节点的规范。有关 Amazon EMR 节点类型的更多信息,请参阅了解节点类型:主节点、核心节点和任务节点。
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="
jar-location
", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role
", "ServiceRole": "service-role
" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )
有关指导您完成完整示例的示例笔记本,请参阅使用集群生命周期管理的管道 EMR 步骤
使用 NotebookJobStep
作为管道步骤非交互式运行 SageMaker Notebook Job。如果您在 Pipelines 拖放用户界面中构建管道,则使用 执行代码步骤 运行笔记本。有关 SageMaker 笔记本作业的更多信息,请参阅 SageMaker Notebook Jobs。
NotebookJobStep
至少需要输入笔记本、映像 URI 和内核名称。有关笔记本作业步骤要求以及可用于自定义步骤的其他参数的更多信息,请参阅 sagemaker.workflow.steps.NotebookJobStep
下面的示例使用最小参数定义了 NotebookJobStep
。
from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=
input_notebook
, image_uri=image_uri
, kernel_name=kernel_name
)
您的 NotebookJobStep
管道步骤被视为 SageMaker 笔记本作业。因此,在 Studio Classic UI 笔记本作业控制面板中,通过使用 tags
参数包含特定标签来跟踪执行状态。有关包含标签的更多详情,请参阅 在 Studio UI 面板上查看笔记本作业。
此外,如果您使用 SageMaker Python SDK 计划笔记本作业,则只能指定某些映像来运行笔记本作业。有关更多信息,请参阅 SageMaker Python SDK 笔记本作业的映像限制。
当未达到预期条件或状态时,使用 Fail 步骤停止 Amazon SageMaker Pipelines 的执行。在 Fail 步骤中还可以输入自定义错误信息,说明管道执行失败的原因。
注意
当一个 Fail 步骤和其他管道步骤同时执行时,管道在所有并发步骤完成之前不会终止。
使用 Fail 步骤的限制
-
您不能将 Fail 步骤添加到其他步骤的
DependsOn
列表中。有关更多信息,请参阅 步骤之间的自定义依赖关系。 -
其他步骤不能引用 Fail 步骤。它始终 是管道执行的最后一步。
-
您不能重试以 Fail 步骤结束的管道执行。
您可以创建静态文本字符串形式的 Fail 步骤错误信息。另外,如果您使用 SDK,也可以使用 Pipeline Parameters、Join