定义 Amazon SageMaker 模型构建管道 - Amazon SageMaker

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

定义 Amazon SageMaker 模型构建管道

要使用 Amazon SageMaker 模型构建管道来协调您的工作流程,请以JSON管道定义的形式生成有向无环图 (DAG)。 下图显示了您在本教程中创建的管道DAG:

管道定向无环图示例 (DAG)。

您可以使用 SageMaker Python 生成JSON管道定义SDK。 以下教程展示了如何生成管道定义。定义的管道解决了一个回归问题,可以根据鲍鱼的物理测量结果来确定鲍鱼的年龄。有关包含本教程内容的可运行 Jupyter 笔记本,请参阅使用 Amazon 模型构建管道编排作业。 SageMaker

先决条件

要运行以下教程,请完成以下操作:

  • 按照创建笔记本实例中所述的步骤设置笔记本实例。这使您的角色有权读取和写入 Amazon S3,并在中创建训练、批量转换和处理任务 SageMaker。

  • 授予笔记本获取和传递自身角色的权限,如修改角色权限策略中所示。添加以下JSON片段,将此策略附加到您的角色。<your-role-arn>替换为ARN用于创建笔记本实例的。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • 按照修改角色信任策略中的步骤信任 SageMaker 服务主体。将以下语句片段添加到角色的信任关系中:

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

设置您的环境

使用以下代码块创建新 SageMaker 会话。这将返回会话ARN的角色。 此角色ARN应该是您作为先决条件设置的执行角色ARN。

import boto3 import sagemaker import sagemaker.session from sagemaker.workflow.pipeline_context import PipelineSession region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() pipeline_session = PipelineSession() model_package_group_name = f"AbaloneModelPackageGroupName"

创建管道

重要

允许 Amazon SageMaker Studio 或 Amazon SageMaker Studio Classic 创建亚马逊 SageMaker资源的自定义IAM策略还必须授予向这些资源添加标签的权限。需要向资源添加标签的权限,因为 Studio 和 Studio Classic 会自动标记他们创建的任何资源。如果IAM策略允许 Studio 和 Studio Classic 创建资源但不允许标记,则在尝试创建资源时可能会出现 AccessDenied “” 错误。有关更多信息,请参阅 提供为资源添加标签 SageMaker的权限

AWS Amazon 托管政策 SageMaker授予创建 SageMaker 资源的权限已经包括在创建这些资源时添加标签的权限。

从 n SageMaker otebook 实例中运行以下步骤来创建包含以下步骤的管道:

  • 预处理

  • 训练

  • 评估

  • 条件评估

  • 模型注册

步骤 1:下载数据集

本笔记本使用 Machine Le UCI arning 鲍鱼数据集。该数据集包含以下特征:

  • length - 鲍鱼外壳最长测量值。

  • diameter - 垂直于长度方向的鲍鱼直径。

  • height - 带肉鲍鱼在壳内的高度。

  • whole_weight - 整只鲍鱼的重量。

  • shucked_weight - 从鲍鱼身上取出的肉的重量。

  • viscera_weight - 鲍鱼内脏出血后的重量。

  • shell_weight - 去肉和干燥后鲍鱼壳的重量。

  • sex - 鲍鱼的性别。“M”、“F”或“I”中的一个,其中“I”是幼鲍。

  • rings - 鲍鱼壳上的环数。

鲍鱼壳上的环数是其年龄的近似值,计算公式为 age=rings + 1.5。但是,获取此号码是一项耗时的任务。您必须从锥体上切壳,将切面染色,然后通过显微镜计算环数。但是,其他物理测量值更容易获得。此笔记本使用该数据集,利用其他物理测量值来构建 rings 变量的预测模型。

下载数据集
  1. 将数据集下载到您账户的默认 Amazon S3 存储桶中。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. 创建模型后,下载第二个数据集进行批量转换。

    local_path = "data/abalone-dataset-batch.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

步骤 2:定义管道参数

此代码块为您的管道定义了以下参数:

  • processing_instance_count - 处理作业的实例数。

  • input_data - 输入数据在 Amazon S3 中的位置。

  • batch_data - 用于批量转换的输入数据在 Amazon S3 中的位置。

  • model_approval_status - 为 CI/CD 注册已训练模型的批准状态。有关更多信息,请参阅 使用 SageMaker 项目MLOps实现自动化

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

步骤 3:定义特征工程的处理步骤

本节介绍如何创建一个处理步骤,从数据集中准备用于训练的数据。

创建处理步骤
  1. 为处理脚本创建目录。

    !mkdir -p abalone
  2. /abalone 目录中创建一个包含以下内容的名为 preprocessing.py 的文件。此预处理脚本被传递到处理步骤,以便在输入数据上运行。然后,训练步骤使用预处理的训练功能和标签来训练模型。评估步骤使用经过训练的模型和预处理的测试特征和标签来评估模型。该脚本使用 scikit-learn 执行以下操作:

    • 填入缺失的 sex 分类数据并对其进行编码,使其适合训练。

    • 缩放和标准化除 ringssex 之外的所有数值字段。

    • 将数据拆分为训练、测试和验证数据集。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. 创建要传递到处理步骤的 SKLearnProcessor 的实例。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", sagemaker_session=pipeline_session, role=role, )
  4. 创建处理步骤。此步骤采用 SKLearnProcessor、输入和输出通道以及您创建的 preprocessing.py 脚本。这与 SageMaker Python 中处理器实例的run方法非常相似SDK。传入 ProcessingStepinput_data 参数是步骤本身的输入数据。处理器实例运行时会使用这些输入数据。

    请注意在处理作业的输出配置中指定的 "train"validation"test" 命名通道。Properties诸如此类的步骤可以在后续步骤中使用,并在运行时解析为其运行时值。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep processor_args = sklearn_processor.run( inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", ) step_process = ProcessingStep( name="AbaloneProcess", step_args=processor_args )

步骤 4:定义训练步骤

本节介绍如何使用 SageMaker XGBoost算法根据处理步骤输出的训练数据训练模型。

定义训练步骤
  1. 指定要保存训练模型的模型路径。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. 为XGBoost算法和输入数据集配置估计器。训练实例类型传递到估算器中。典型的训练脚本:

    • 加载来自输入通道的数据

    • 使用超参数配置训练

    • 训练模特

    • 将模型保存到,model_dir以便日后可以将其托管

    SageMaker 在训练作业结束model.tar.gz时将模型以 a 的形式上传到 Amazon S3。

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type="ml.m5.xlarge" ) xgb_train = Estimator( image_uri=image_uri, instance_type="ml.m5.xlarge", instance_count=1, output_path=model_path, sagemaker_session=pipeline_session, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. TrainingStep使用估算器实例和的属性创建。ProcessingStep"train""validation"输出通道S3Uri的传入到TrainingStep。 

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep train_args = xgb_train.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, ) step_train = TrainingStep( name="AbaloneTrain", step_args = train_args )

步骤 5:定义模型评估的处理步骤

本节介绍如何创建处理步骤以评估模型的精度。在条件步骤中使用此模型评估的结果来确定要采用哪条运行路径。

定义模型评估的处理步骤
  1. 在名为 evaluation.py/abalone 目录中创建一个文件。此脚本在处理步骤中用于执行模型评估。它以经过训练的模型和测试数据集作为输入,然后生成包含分类评估指标的JSON文件。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. 创建 ScriptProcessor 的实例,用于创建 ProcessingStep

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type="ml.m5.xlarge", instance_count=1, base_job_name="script-abalone-eval", sagemaker_session=pipeline_session, role=role, )
  3. ProcessingStep使用处理器实例、输入和输出通道以及evaluation.py脚本创建。 传入:

    • step_train训练步骤中的S3ModelArtifacts属性

    • step_process处理步骤S3Uri"test"输出通道的

    这与 SageMaker Python 中处理器实例的run方法非常相似SDK。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) eval_args = script_eval.run( inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", ) step_eval = ProcessingStep( name="AbaloneEval", step_args=eval_args, property_files=[evaluation_report], )

步骤 6: CreateModelStep 为批量转换定义一个

重要

从 Python 版本 2.90.0 起,我们建议使用模型步骤来创建模型。 SageMaker SDK CreateModelStep将继续在先前版本的 SageMaker Python 中运行SDK,但不再受到积极支持。

本节介绍如何根据训练步骤的输出创建 SageMaker 模型。此模型用于对新数据集进行批量转换。此步骤将传递到条件步骤,并且仅在条件步骤的计算结果为true时才运行。

CreateModelStep 为批量转换定义一个
  1. 创建 SageMaker 模型。从 step_train 训练步骤传入 S3ModelArtifacts 属性。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=pipeline_session, role=role, )
  2. 为您的 SageMaker 模型定义模型输入。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. CreateModelStep使用您定义的CreateModelInput和 SageMaker 模型实例创建您的。

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

步骤 7:定义一个 TransformStep 以执行批量转换

本节介绍如何在模型训练完毕后创建 TransformStep 以对数据集执行批量转换。此步骤将传递到条件步骤,并且仅在条件步骤的计算结果为true时才运行。

要定义 TransformStep 要执行批量转换
  1. 使用适当的计算实例类型、实例数量和所需的输出 Amazon S3 存储桶创建转换器实例URI。从 step_create_model CreateModel 步骤传入 ModelName 属性。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. 使用您定义的转换器实例和 batch_data 管道参数创建 TransformStep

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

步骤 8:定义创建模型包的 RegisterModel 步骤

重要

我们建议使用从 P 模型步骤 ython 版本 2.90.0 起注册模型。 SageMaker SDK RegisterModel将继续在先前版本的 SageMaker Python 中运行SDK,但不再受到积极支持。

本节介绍如何创建的实例RegisterModel。在管道RegisterModel中运行的结果是一个模型包。模型包是一种可重复使用的模型构件抽象,它封装了推理所需的所有要素。它由一个定义要使用的推理映像的推理规范和一个可选的模型权重位置组成。模型包组是模型包的集合。您可以使用 for Pipelin SageMaker es ModelPackageGroup 为每次管道运行向组中添加新版本和模型包。有关模型注册表的更多信息,请参阅使用模型注册表注册和部署模型

此步骤将传递到条件步骤,并且仅在条件步骤的计算结果为true时才运行。

定义创建模型包的 RegisterModel 步骤
  • 使用您用于训练步骤的估算器实例构造一个 RegisterModel 步骤。从 step_train 训练步骤传入 S3ModelArtifacts 属性并指定 ModelPackageGroup。 SageMaker Pipelines 会ModelPackageGroup为你创建这个。

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

步骤 9:定义用于验证模型精度的条件步骤

A ConditionStep 允许 Pipelin SageMaker es 支持DAG根据步骤属性的条件在管道中进行有条件的运行。在这种情况下,只有当模型的精度超过所需值时,才需要注册该模型包。模型的准确性由模型评估步骤决定。如果精度超过所需值,管道还会创建一个 SageMaker 模型并对数据集运行批量转换。本节介绍如何定义条件步骤。

定义条件步骤以验证模型精度
  1. 使用模型评估处理步骤 step_eval 的输出中找到的精度值定义 ConditionLessThanOrEqualTo 条件。使用您在处理步骤中编制索引的属性文件以及相应的JSONPath均方误差值获取此输出。"mse"

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. 构造一个 ConditionStep。传入 ConditionEquals 条件,如果条件通过,则将模型包注册和批量转换步骤设置为后续步骤。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

步骤 10:创建管道

现在,您已经创建了所有步骤,请将它们组合成一个管道。

要创建管道
  1. 为您的管道定义以下内容:nameparameterssteps。名称在 (account, region) 对中必须唯一。

    注意

    一个步骤只能在管道的步骤列表或条件步骤的 if/else 步骤列表中出现一次。不能同时出现在两者中。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (可选)检查JSON管道定义以确保其格式正确。

    import json json.loads(pipeline.definition())

此管道定义已准备好提交给 SageMaker。在下一个教程中,您将此管道提交到 SageMaker 并开始运行。

下一步:运行管道