增加一个步骤 - 亚马逊 SageMaker AI

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

增加一个步骤

下文介绍了每种步骤类型的要求,并提供了步骤的实现示例,以及如何将步骤添加到 Pipelines 中。这些都不是可行的实施方案,因为它们没有提供所需的资源和投入。有关实施这些步骤的教程,请参阅Pipelines 操作

注意

您还可以使用 @step 装饰器将本地机器学习代码转换为 Pipelines 步骤,从而从本地机器学习代码中创建步骤。有关更多信息,请参阅 @step decorator

Amaz SageMaker on Pipelines 支持以下步骤类型:

@step decorator

如果您想在 Pipelines 用户界面中编排利用高级 SageMaker AI 功能或其他 AWS 服务的自定义 ML 作业,请使用。 drag-and-drop 执行代码步骤

您可以使用 @step 装饰器从本地机器学习代码中创建一个步骤。测试代码后,您可以通过使用装饰器对其进行注释来将该函数转换为 SageMaker AI 管道步骤。@step当您将 @step 装饰功能的输出作为一个步骤传递给管道时,Pipelines 会创建并运行管道。您还可以创建一个多步骤 DAG 管道,其中包括一个或多个@step经过装饰的函数以及传统的 SageMaker AI 工作流步骤。有关如何使用 @step 装饰器创建步骤的更多详情,请参阅 Lift-and-shift 使用 @step 装饰器的 Python 代码

在 Pipelines drag-and-drop UI 中,您可以使用 “执行代码” 步骤将自己的代码作为工作流步骤运行。您可以上传一个 Python 函数、脚本或笔记本,作为管道的一部分执行。如果您想编排利用高级 SageMaker AI 功能或其他 AWS 服务的自定义 ML 作业,则应使用此步骤。

执行代码” 步骤会将文件上传到你的 Amazon A SageMaker I 默认 Amazon S3 存储桶。该存储桶可能未设置所需的跨源资源共享 (CORS) 权限。要了解有关配置 CORS 权限的更多信息,请参阅 输入映像数据的 CORS 要求

执行代码” 步骤使用 Amazon SageMaker 训练作业来运行您的代码。确保您的 IAM 角色具有 sagemaker:DescribeTrainingJobsagemaker:CreateTrainingJob API 权限。要详细了解 Amazon A SageMaker I 的所有必需权限以及如何设置这些权限,请参阅Amazon SageMaker AI API 权限:操作、权限和资源参考

要使用管道设计器向管道添加执行代码步骤,请执行以下操作:

  1. 按照中的说明打开 Amazon SageMaker Studio 控制台启动亚马逊 SageMaker Studio

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择执行代码,然后将其拖到画布上。

  6. 在画布中,选择添加的执行代码步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。

  8. 您可以上传单个文件来执行,也可以上传包含多个构件的压缩文件夹。

  9. 对于单个文件上传,您可以为笔记本、python 函数或脚本提供可选参数。

  10. 在提供 Python 函数时,必须以 file.py:<function_name> 格式提供处理程序。

  11. 对于上传压缩文件夹,必须提供代码的相对路径,您还可以选择提供压缩文件夹内 requirements.txt 文件或初始化脚本的路径。

  12. 如果画布上有任何步骤紧接在您添加的执行代码步骤之前,请单击并拖动光标从该步骤到执行代码步骤,以创建边缘。

  13. 如果画布中包含任何紧接您添加的执行代码步骤的步骤,请单击并拖动光标从执行代码步骤到该步骤,以创建边缘。可以引用执行代码步骤的输出作为 Python 函数。

使用处理步骤创建用于数据处理的处理作业。有关处理作业的更多信息,请参阅处理数据和评估模型

Pipeline Designer

要使用管道设计器向管道添加处理步骤,请执行以下操作:

  1. 按照中的说明打开 Amazon SageMaker Studio 控制台启动亚马逊 SageMaker Studio

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 在左侧边栏中选择处理数据,然后将其拖到画布上。

  5. 在画布中,选择添加的处理数据步骤。

  6. 在右侧边栏中,填写设置详情标签中的表格。有关这些选项卡中字段的信息,请参阅 sagemaker.workflow.steps。 ProcessingStep

  7. 如果画布上有任何步骤紧接在您添加的处理数据步骤之前,请单击并拖动光标从该步骤到处理数据步骤,以创建边缘。

  8. 如果画布中包含任何紧接您添加的处理数据步骤的步骤,请单击并拖动光标从处理数据步骤到该步骤,以创建边缘。

SageMaker Python SDK

处理步骤需要处理器、定义处理代码的 Python 脚本、用于处理的输出以及作业参数。以下示例说明如何创建 ProcessingStep 定义。

from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )

传递运行时参数

以下示例说明如何将运行时参数从 PySpark处理器传递给ProcessingStep

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )

有关处理步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 ProcessingStep文档。有关深入的示例,请参阅使用 Amazon Pipelines 编排任务以训练和评估模型的 SageMaker 示例笔记本。为特征工程定义处理步骤部分包含更多信息。

您可以使用训练步骤创建训练作业来训练模型。有关训练作业的更多信息,请参阅使用 Amazon A SageMaker I 训练模型

训练步骤需要估算器以及训练和验证数据输入。

Pipeline Designer

要使用管道设计器向管道添加训练步骤,请执行以下操作:

  1. 按照中的说明打开 Amazon SageMaker Studio 控制台启动亚马逊 SageMaker Studio

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择训练模型,然后将其拖到画布上。

  6. 在画布中选择添加的训练模型步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。有关这些选项卡中字段的信息,请参阅 sagemaker.workflow.steps。 TrainingStep

  8. 如果画布上有任何步骤紧接在您添加的训练模型步骤之前,请单击并拖动光标从该步骤到训练模型步骤,以创建边缘。

  9. 如果画布上有任何步骤紧接着您添加的训练模型步骤,请单击并拖动光标从训练模型步骤到该步骤,以创建边缘。

SageMaker Python SDK

以下示例说明如何创建 TrainingStep 定义。有关训练步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 TrainingStep文档。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )

您可以使用优化步骤来创建超参数优化作业,也称为超参数优化 (HPO)。超参数调整作业运行多个训练作业,每个作业产生一个模型版本。有关超参数优化的更多信息,请参阅使用 SageMaker AI 自动调整模型

调优作业与管道的 SageMaker AI 实验相关联,而训练作业是作为试验创建的。有关更多信息,请参阅 实验集成

调整步骤需要HyperparameterTuner和训练输入。您可以通过指定 HyperparameterTunerwarm_start_config 参数来再训练以前的优化作业。有关超参数优化和温启动的更多信息,请参阅运行热启动超参数调优作业

你可以使用 sagemaker.workflow.step s 的 get_top_model_s3_uri 方法。 TuningStepclass,用于从性能最好的模型版本之一中获取模型工件。有关显示如何在 SageMaker AI 管道中使用调整步骤的笔记本,请参阅 sagemaker-pipelines-tuning-step.ipynb。

重要

Amaz SageMaker on Python SDK v2.48.0 和 Amazon Studio Classic v3.8.0 中引入了调整步骤。 SageMaker 使用调整步骤前必须更新 Studio Classic,否则不会显示管道 DAG。要更新 Studio Classic,请参阅 关闭并更新 SageMaker Studio 经典版

以下示例说明如何创建 TuningStep 定义。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://amzn-s3-demo-bucket/my-data")) )

获取最佳模型版本

以下示例说明如何使用 get_top_model_s3_uri 方法从优化作业中获取最佳模型版本。最多只能根据排名前50位的版本进行排名HyperParameterTuningJobObjectivetop_k 参数是版本的索引,其中 top_k=0 是性能最好的版本,top_k=49 是性能最差的版本。

best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )

有关调整步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 TuningStep文档。

Fine-tuning 在新数据集上训练来自 Amaz SageMaker JumpStart on 的预训练基础模型。这个过程也称为转移学习,可以使用较小数据集和较短的训练时间生成准确模型。对模型进行微调时,可以使用默认数据集,也可以选择自己的数据。要了解有关微调基础模型的更多信息 JumpStart,请参阅微调模型

微调步骤使用 Amazon SageMaker 训练作业自定义您的模型。确保您的 IAM 角色具有 sagemaker:DescribeTrainingJobsagemaker:CreateTrainingJob API 权限,以便在管道中执行微调作业。API 权限,以便在管道中执行微调作业。要了解有关 Amazon A SageMaker I 所需权限以及如何设置权限的更多信息,请参阅Amazon SageMaker AI API 权限:操作、权限和资源参考

要使用 drag-and-drop编辑器向管道中添加 Fine-Tune 模型步骤,请执行以下步骤:

  1. 按照 启动亚马逊 SageMaker Studio 中的说明打开 Studio 管理控制台。

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择微调模型,然后将其拖动到画布上。

  6. 在画布中,选择添加的微调模型步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。

  8. 如果画布上有任何步骤紧接在您添加的微调模型步骤之前,请单击并拖动光标从该步骤到微调模型步骤,以创建边缘。

  9. 如果画布上有任何步骤紧接着您添加的微调模型步骤,请单击并拖动光标从微调模型步骤到该步骤,以创建边缘。

使用 AutoML API 创建 AutoML 作业以自动训练模型。有关 AutoML 任务的更多信息,请参阅使用 Ama SageMaker zon Autopilot 自动开发模型

注意

目前,AutoML 步骤仅支持组合训练模式

以下示例说明如何使用 AutoMLStep 创建定义。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="<role>", target_attribute_name="my_target_attribute_name", mode="ENSEMBLING", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://amzn-s3-demo-bucket/my-training-data", target_attribute_name="my_target_attribute_name", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://amzn-s3-demo-bucket/my-validation-data", target_attribute_name="my_target_attribute_name", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )

获取最佳模型版本

AutoML 步骤会自动训练多个候选模型。使用 get_best_auto_ml_model 方法从 AutoML 作业中获取目标指标最佳的模型,如下所示。您还必须使用 IAM role 访问模型构件。

best_model = step_automl.get_best_auto_ml_model(role=<role>)

有关更多信息,请参阅 Pyth SageMaker on 软件开发工具包中的 AutoML 步骤。

ModelStep使用创建或注册 A SageMaker I 模型。有关ModelStep要求的更多信息,请参阅 sagemaker.workflow. model_step。 ModelStep文档。

创建模型

您可以使用创建 ModelStep A SageMaker I 模型。ModelStep需要模型工件和有关创建模型所需的 SageMaker AI 实例类型的信息。有关 SageMaker AI 模型的更多信息,请参阅使用 Amazon A SageMaker I 训练模型

以下示例说明如何创建 ModelStep 定义。

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )

注册模型

您可以使用 a ModelStep 在 Amazon SageMaker 模型注册中注册sagemaker.model.Model或。sagemaker.pipeline.PipelineModelPipelineModel 表示推理管道,是一个由处理推理请求的容器线性序列组成的模型。有关如何注册模型的更多信息,请参阅利用模型注册中心进行模型注册部署

以下示例说明如何创建一个注册 PipelineModelModelStep

import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )

您可以使用 “创建模型” 步骤来创建 A SageMaker I 模型。有关 SageMaker AI 模型的更多信息,请参阅使用 Amazon 训练模型 SageMaker

创建模型步骤需要模型工件和有关创建模型所需的 SageMaker AI 实例类型的信息。下面的示例展示了如何创建创建模型步骤定义。有关创建模型步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 CreateModelStep文档。

Pipeline Designer

要在管道中添加创建模型步骤,请执行以下操作:

  1. 按照 启动亚马逊 SageMaker Studio 中的说明打开 Studio 管理控制台。

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择创建模型,然后将其拖到画布上。

  6. 在画布中,选择添加的创建模型步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。有关这些选项卡中字段的信息,请参阅 sagemaker.workflow.steps。 CreateModelStep

  8. 如果画布上有任何步骤紧接在您添加的创建模型步骤之前,请单击并拖动光标从该步骤到创建模型步骤,以创建边缘。

  9. 如果画布上有任何步骤紧接着您添加的创建模型步骤,请单击并拖动光标从创建模型步骤到该步骤,以创建边缘。

SageMaker Python SDK
重要

从 AI SageMaker Py 模型步骤 thon SDK 版本 2.90.0 起,我们建议使用创建模型。 CreateModelStep将继续在以前版本的 SageMaker Python SDK 中运行,但不再受支持。

from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )

“注册模型” 步骤将模型注册到 “ SageMaker 模型注册表”。

Pipeline Designer

要使用管道设计器从管道中注册模型,请执行以下操作:

  1. 按照中的说明打开 Amazon SageMaker Studio 控制台启动亚马逊 SageMaker Studio

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择注册模型,然后将其拖到画布上。

  6. 在画布中,选择添加的注册模型步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。有关这些选项卡中字段的信息,请参阅 sagemaker.workflow.step_collections。 RegisterModel

  8. 如果画布上有任何步骤紧接在您添加的注册模型步骤之前,请单击并拖动光标从该步骤到注册模型步骤,以创建边缘。

  9. 如果画布中包含任何紧接您添加的注册模型步骤的步骤,请单击并拖动光标从注册模型步骤到该步骤,以创建边缘。

SageMaker Python SDK
重要

我们建议使用模型步骤从 AI SageMaker Python SDK 版本 2.90.0 起注册模型。 RegisterModel将继续在以前版本的 SageMaker Python SDK 中运行,但不再受支持。

你可以使用RegisterModel步骤注册 sagemaker.model.model 或 sagemaker.pipeline。 PipelineModel使用 Amazon SageMaker 模型注册表。PipelineModel 表示推理管道,是一个由处理推理请求的容器线性序列组成的模型。

有关如何注册模型的更多信息,请参阅利用模型注册中心进行模型注册部署。有关RegisterModel步骤要求的更多信息,请参阅 s agemaker.workflow.step_collections。 RegisterModel文档。

以下示例说明如何创建注册 PipelineModelRegisterModel 步骤。

import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', )

如果未提供 model,则注册模型步骤需要使用估算器,如以下示例所示。

from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

在管道设计器中,使用部署模型(端点)步骤将模型部署到端点。您可以创建一个新端点或使用现有端点。实时推理非常适合有实时、交互式、低延迟要求的推理工作负载。您可以将模型部署到 SageMaker AI Hosting 服务,并获得可用于推理的实时终端节点。这些端点可完全托管,并支持自动扩缩。要了解有关 SageMaker AI 中实时推理的更多信息,请参阅实时推理

在管道中添加部署模型步骤之前,请确保您的 IAM 角色具有以下权限:

  • sagemaker:CreateModel

  • sagemaker:CreateEndpointConfig

  • sagemaker:CreateEndpoint

  • sagemaker:UpdateEndpoint

  • sagemaker:DescribeModel

  • sagemaker:DescribeEndpointConfig

  • sagemaker:DescribeEndpoint

要详细了解 SageMaker AI 所需的所有权限以及如何设置这些权限,请参阅Amazon SageMaker AI API 权限:操作、权限和资源参考

要在 drag-and-drop编辑器中向流水线添加模型部署步骤,请完成以下步骤:

  1. 按照 启动亚马逊 SageMaker Studio 中的说明打开 Studio 管理控制台。

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择部署模型(端点),然后将其拖到画布上。

  6. 在画布中,选择添加的部署模型(端点)步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。

  8. 如果画布上有任何步骤紧接在您添加的部署模型(端点)步骤之前,请单击并拖动光标,从该步骤到部署模型(端点)步骤,以创建边缘。

  9. 如果画布中包含任何紧接您添加的部署模型(端点)步骤的步骤,请单击并将光标从部署模型(端点)步骤拖动到该步骤,以创建边缘。

您可以使用转换步骤进行批量转换,在整个数据集上运行推理。有关批量转换的更多信息,请参阅利用推理管道进行批量转换

转换步骤需要转换器和用于运行批量转换的数据。下面的示例显示了如何创建变换步骤定义。有关转换步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 TransformStep文档。

Pipeline Designer

要使用 drag-and-drop可视化编辑器向管道添加批量转换步骤,请执行以下操作:

  1. 按照 启动亚马逊 SageMaker Studio 中的说明打开 Studio 管理控制台。

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择部署模型(批量转换),然后将其拖到画布上。

  6. 在画布中,选择添加的部署模型(批量转换)步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。有关这些选项卡中字段的信息,请参阅 sagemaker.workflow.steps。 TransformStep

  8. 如果画布上有任何步骤紧接在您添加的部署模型(批量转换)步骤之前,请单击并拖动光标从该步骤到部署模型(批量转换)步骤,以创建边缘。

  9. 如果画布中包含任何紧接您添加的部署模型(批量转换)步骤的步骤,请单击并将光标从部署模型(批量转换)步骤拖动到该步骤,以创建边缘。

SageMaker Python SDK
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://amzn-s3-demo-bucket/my-data"), )

您可以使用条件步骤来评估步骤属性的条件,以评估管道中下一步应该采取的操作。

需要一个条件步骤:

  • 一个条件列表。

  • 如果条件评估结果为 true,则要运行的步骤列表。

  • 如果条件评估结果为 false,则要运行的步骤列表。

Pipeline Designer

要使用 Pipeline Designer 向管道添加条件步骤,请执行以下操作:

  1. 按照中的说明打开 Amazon SageMaker Studio 控制台启动亚马逊 SageMaker Studio

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择条件,然后将其拖到画布上。

  6. 在画布中,选择添加的条件步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。有关这些选项卡中字段的信息,请参阅 sagemaker.workflow.condition_step。 ConditionStep

  8. 如果画布上有任何步骤紧接在您添加的条件步骤之前,请单击并拖动光标从该步骤到条件步骤,以创建边缘。

  9. 如果画布中包含任何紧接您添加的条件步骤的步骤,请单击并拖动光标从条件步骤到该步骤,以创建边缘。

SageMaker Python SDK

以下示例说明如何创建 ConditionStep 定义。

限制

  • 管道不支持使用嵌套条件步骤。不能将一个条件步骤作为另一个条件步骤的输入进行传递。

  • 一个条件步骤不能在两个分支中使用相同的步骤。如果需要在两个分支中使用相同的步骤功能,请复制该步骤并为其指定不同的名称。

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )

有关ConditionStep要求的更多信息,请参阅 sagemaker.workflow.con dition_step。 ConditionStepAPI 参考。有关支持条件的更多信息,请参阅 A SageMaker I Python SDK 文档中的 Amazon Pipelin SageMaker es-条件

使用Callback步骤向您的工作流程中添加并非由 Amazon Pipelines 直接提供的其他流程和 AWS SageMaker 服务。当 Callback 步骤运行时,会发生以下过程:

  • Pipelines 向客户指定的 Amazon Simple Queue Service (Amazon SQS) 队列发送消息。该信息包含一个 Pipelines 生成的标记和客户提供的输入参数列表。发送信息后,Pipelines 会等待客户的回复。

  • 客户从 Amazon SQS 队列中检索消息并启动他们的自定义流程。

  • 该过程完成后,客户调用以下方法之一 APIs 并提交 Pipelines 生成的令牌:

  • API 调用会导致 Pipelines 继续执行管道流程或使流程失败。

有关Callback步骤要求的更多信息,请参阅 sagemaker.workflow.cal lback_step。 CallbackStep文档。有关完整的解决方案,请参阅使用回调步骤扩展 SageMaker 管道以包含自定义步骤

重要

Callback亚马逊 SageMaker Python SDK v2.45.0 和 Amazon SageMaker Studio Classic v3.6.2 中引入了步骤。在使用 Callback 步骤之前,您必须更新 Studio Classic,否则不会显示管道 DAG。要更新 Studio Classic,请参阅 关闭并更新 SageMaker Studio 经典版

下面的示例展示了上述程序的执行过程。

from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker AI to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
注意

不得嵌套 CallbackStep 的输出参数。例如,如果您使用嵌套字典作为输出参数,则该字典将被视为单个字符串(例如 {"output1": "{\"nested_output1\":\"my-output\"}"})。 如果您提供嵌套值,则当您尝试引用特定的输出参数时, SageMaker AI 会抛出不可重试的客户端错误。

停止行为

Callback 步骤运行时,管道进程不会停止。

当您使用正在运行的Callback步骤调用StopPipelineExecution管道进程时,Pipelines 会向 SQS 队列发送一条 Amazon SQS 消息。SQS 消息正文包含一个状态字段,该字段设置为 Stopping。以下是 SQS 消息正文示例。

{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }

您应该在 Amazon SQS 消息用户中添加逻辑,以便在收到消息后采取任何必要的措施(例如,资源清理)。然后添加对 SendPipelineExecutionStepSuccessSendPipelineExecutionStepFailure 的调用。

只有当 Pipelines 收到这些调用时,它才会停止管道进程。

您可以使用 Lambda 步骤来运行函数。 AWS Lambda 您可以运行现有的 Lambda 函数,或者 SageMaker AI 可以创建并运行新的 Lambda 函数。有关展示如何在 SageMaker AI 管道中使用 Lambda 步骤的笔记本,请参阅 sagemaker-pipelines-lambda-step .ipynb。

重要

亚马逊 Pyth SageMaker on SDK v2.51.0 和亚马 SageMaker 逊 Studio Classic v3.9.1 中引入了 Lambda 步骤。您必须在使用 Lambda 步骤之前更新 Studio Classic,否则不会显示管道 DAG。要更新 Studio Classic,请参阅 关闭并更新 SageMaker Studio 经典版

SageMaker AI 提供了 sagemaker.lambda_Helper.Lambda 类来创建、更新、调用和删除 Lambda 函数。 Lambda具有以下签名。

Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )

sagemaker. workflow.lambda_step。 LambdaStep类的lambda_func参数类型为Lambda。要调用现有 Lambda 函数,唯一的要求是将函数的 Amazon 资源名称 (ARN) 提供给 function_arn。如果不提供 function_arn 的值,则必须指定 handler 和以下内容之一:

  • zipped_code_dir - 压缩后的 Lambda 函数的路径

    s3_bucket - 要上传 zipped_code_dir 的 Amazon S3 存储桶

  • script - Lambda 函数脚本文件的路径

以下示例说明如何创建调用现有 Lambda 函数的 Lambda 步骤定义。

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

以下示例说明如何创建 Lambda 步骤定义,该定义使用 Lambda 函数脚本创建和调用 Lambda 函数。

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

输入和输出

如果您的 Lambda 函数有输入或输出,则还必须在 Lambda 步骤中定义这些输入或输出。

注意

不得嵌套输入和输出参数。例如,如果您使用嵌套字典作为输出参数,则该字典将被视为单个字符串(例如 {"output1": "{\"nested_output1\":\"my-output\"}"})。如果您提供嵌套值并尝试稍后再引用该值,则会引发不可重试的客户端错误。

定义 Lambda 步骤时,inputs 必须是键值对的字典。inputs 字典的每个值都必须是基元类型(字符串、整数或浮点数)。不支持嵌套对象。如果 inputs 值未定义,则默认为 None

outputs 值必须是键列表。这些键是指 Lambda 函数输出中定义的字典。比如 inputs,这些键必须是基元类型,并且不支持嵌套对象。

超时和停止行为

Lambda 类有一个 timeout 参数,用于指定 Lambda 函数可以运行的最长时间。默认值为 120 秒,最大值为 10 分钟。如果在达到超时条件时 Lambda 函数正在运行,则 Lambda 步骤将失败;但 Lambda 函数会继续运行。

当 Lambda 步骤正在运行时,无法停止管道进程,因为无法停止 Lambda 步骤调用的 Lambda 函数。如果您在 Lambda 函数运行时停止进程,管道会等待函数完成或超时。这取决于哪个先发生。进程随即停止。如果 Lambda 函数完成,则管道进程状态为 Stopped。如果达到超时,则管道进程状态为 Failed

您可以使用 ClarifyCheck 步骤对照先前的基准进行基准偏差检查,以进行偏差分析和实现模型可解释性。然后,您可以使用 model.register() 方法生成和注册基准,并使用 step_args 将该方法的输出传递给 模型步骤。Amazon SageMaker 模型监控器可以将这些偏差检查基准用于您的模型终端节点。因此,您无需单独提出基线建议。

ClarifyCheck 步骤还可以从模型注册表中提取偏差检查基准。该ClarifyCheck步骤使用 Clarify SageMaker 预先构建的容器。该容器提供一系列模型监测功能,包括约束建议和根据给定基线进行约束验证。有关更多信息,请参阅 预建的 SageMaker 澄清容器

配置 ClarifyCheck 步骤

您可以将 ClarifyCheck 步骤配置为每次在管道中使用时仅执行以下一种类型的检查。

  • 数据偏差检查

  • 模型偏差检查

  • 模型可解释性检查

为此,请将 clarify_check_config 参数设置为以下检查类型值之一:

  • DataBiasCheckConfig

  • ModelBiasCheckConfig

  • ModelExplainabilityCheckConfig

ClarifyCheck步骤启动一个处理作业,该作业运行 SageMaker AI Clarify 预建容器,并且需要为支票和处理作业进行专用配置ClarifyCheckConfigCheckJobConfig是这些配置的辅助函数。这些辅助函数与 Clarify 处理 SageMaker 作业的计算方式一致,用于检查模型偏差、数据偏差或模型可解释性。有关更多信息,请参阅 运行 Cl SageMaker arify 处理作业以实现偏见分析和可解释性

控制偏差检查的步骤行为

ClarifyCheck 步骤需要以下两个布尔标志来控制其行为:

  • skip_check:此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为 False,则配置的检查类型的先前基准必须可用。

  • register_new_baseline:此参数表示是否可以通过步骤属性 BaselineUsedForDriftCheckConstraints 访问新计算的基准。如果将其设置为 False,则配置的检查类型的先前基准也必须可用。这可以通过 BaselineUsedForDriftCheckConstraints 属性访问。

有关更多信息,请参阅 基准计算、偏差检测和生命周期以及 Amazon Pi ClarifyCheck pelin SageMaker es 中的 QualityCheck 步骤

使用基准

您可以选择指定 model_package_group_name 来定位现有基线。然后,ClarifyCheck 步骤在模型软件包组中最新批准的模型软件包上提取 DriftCheckBaselines

或者,您可以通过 supplied_baseline_constraints 参数提供先前的基准。如果同时指定 model_package_group_namesupplied_baseline_constraints,则 ClarifyCheck 步骤将使用 supplied_baseline_constraints 参数指定的基准。

有关使用ClarifyCheck步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 ClarifyCheckStep在适用于 Python 的亚马逊 SageMaker AI SageMaker 人工智能开发工具包中。有关展示如何使用 Pipelines 中ClarifyCheck步骤的 Amazon SageMaker Studio Classic 笔记本,请参阅 sagemaker-pipeline-model-monitor-clarify-steps.ipynb

例 创建 ClarifyCheck 步骤以进行数据偏差检查
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']), label=0, dataset_type="text/csv", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json", model_package_group_name="MyModelPackageGroup" )

使用 QualityCheck 步骤对管道中的数据质量或模型质量进行基线建议和漂移检查。然后,您可以使用 model.register() 方法生成和注册基准,并使用 step_args 将该方法的输出传递给 模型步骤

Model Monitor 可以将这些偏差检查基准用于模型端点,这样您就无需单独提出基准建议。QualityCheck 步骤还可以从模型注册表中提取偏差检查基准。该QualityCheck步骤利用了 Amazon A SageMaker I 模型监控器预先构建的容器。该容器具有一系列模型监测功能,包括约束建议、统计数据生成和对照基线进行约束验证。有关更多信息,请参阅 Amazon SageMaker 模型监控器预建容器

配置 QualityCheck 步骤

您可以对 QualityCheck 步骤进行配置,使其每次在管道中使用时只运行以下一种检查类型。

  • 数据质量检查

  • 模型质量检查

您可以通过将 quality_check_config 参数设置为以下检查类型值之一来实现此目的:

  • DataQualityCheckConfig

  • ModelQualityCheckConfig

QualityCheck 步骤会启动一个处理作业,该作业运行 Model Monitor 预构建容器,并且需要专门的用于检查和处理作业的配置。QualityCheckConfigCheckJobConfig 是这些配置的辅助功能。这些辅助功能与模型监控器为模型质量或数据质量监控创建基线的方式一致。有关 Model Monitor 基准建议的更多信息,请参阅创建基准创建模型质量基线

控制偏差检查的步骤行为

QualityCheck 步骤需要以下两个布尔标志来控制其行为:

  • skip_check:此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为 False,则配置的检查类型的先前基准必须可用。

  • register_new_baseline:此参数表示是否可以通过步骤属性 BaselineUsedForDriftCheckConstraintsBaselineUsedForDriftCheckStatistics 访问新计算的基准。如果将其设置为 False,则配置的检查类型的先前基准也必须可用。它们可以通过 BaselineUsedForDriftCheckConstraintsBaselineUsedForDriftCheckStatistics 属性进行访问。

有关更多信息,请参阅基准计算、偏差检测和生命周期以及 Amazon Pi ClarifyCheck pelin SageMaker es 中的 QualityCheck 步骤

使用基准

您可以通过 supplied_baseline_statisticssupplied_baseline_constraints 参数直接指定上一条基线。您还可以指定 model_package_group_nameQualityCheck 步骤,在模型软件包组中最新批准的模型软件包上拉动 DriftCheckBaselines

指定以下内容时,QualityCheck 步骤将使用 supplied_baseline_constraintssupplied_baseline_statisticsQualityCheck 步骤的校验类型上指定的基线。

  • model_package_group_name

  • supplied_baseline_constraints

  • supplied_baseline_statistics

有关使用QualityCheck步骤要求的更多信息,请参阅 sagemaker.workflow.steps。 QualityCheckStep在适用于 Python 的亚马逊 SageMaker AI SageMaker 人工智能开发工具包中。有关展示如何使用 Pipelines 中QualityCheck步骤的 Amazon SageMaker Studio Classic 笔记本,请参阅 sagemaker-pipeline-model-monitor-clarify-steps.ipynb

例 创建 QualityCheck 步骤以进行数据质量检查
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json", model_package_group_name="MyModelPackageGroup" )

使用 Amazon P ip SageMaker elines EMR 步骤可以:

  • 在运行中的 Amazon EMR 集群上处理 Amazon EMR 步骤

  • 让管道为您创建和管理 Amazon EMR 集群。

有关 Amazon EMR 的更多信息,请参阅 Amazon EMR 入门

EMR 步骤要求 EMRStepConfig 包括 Amazon EMR 集群使用的 JAR 文件的位置以及要传递的任何参数。如果您要在运行中的 EMR 集群上运行该步骤,还需提供 Amazon EMR 集群 ID。您还可以通过集群配置,在它为您创建、管理和终止的集群上运行 EMR 步骤。以下几节包括演示这两种方法的示例笔记本的示例和链接。

注意
  • EMR 步骤要求传递给管道的角色具有其他权限。将 AWS 托管式策略附加到管道角色:AmazonSageMakerPipelinesIntegrations 到管道角色,或确保角色包含该策略中的权限。

  • EMR Serverless 不支持 EMR 步骤。EKS 上的 Amazon EMR 也不支持该功能。

  • 如果您在运行中的集群上处理 EMR 步骤,则只能使用处于以下状态之一的集群:

    • STARTING

    • BOOTSTRAPPING

    • RUNNING

    • WAITING

  • 如果您在正在运行的集群上处理 EMR 步骤,则在 EMR 集群上最多可以有 256 个处于 PENDING 状态的 EMR 步骤。提交的 EMR 步骤超过此限制会导致管道执行失败。您可以考虑使用管道步骤的重试策略

  • 您可以指定集群 ID 或集群配置,但不能同时指定两者。

  • EMR 步骤依靠 Amazon EventBridge 来监控 EMR 步骤或集群状态的变化。如果您在正在运行的集群上处理 Amazon EMR 作业,则 EMR 步骤将使用 SageMakerPipelineExecutionEMRStepStatusUpdateRule 规则来监控 EMR 步骤的状态。如果您在 EMR 步骤创建的集群上处理作业,则该步骤会使用 SageMakerPipelineExecutionEMRClusterStatusRule 规则监控集群状态的变化。如果您在 AWS 账户中看到其中任何一条 EventBridge 规则,请不要将其删除,否则您的 EMR 步骤可能无法完成。

在正在运行的 Amazon EMR 集群上启动新作业

要在运行中的 Amazon EMR 集群上启动新作业,请将集群 ID 作为字符串传递给 EMRStepcluster_id 参数。以下示例演示了此过程。

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )

有关指导您完成完整示例的示例笔记本,请参阅运行 EMR 集群的 Pipelines EMR 步骤

在新的 Amazon EMR 集群上启动新作业

要在 EMRStep 为您创建的新集群上启动新作业,请将集群配置作为字典提供。字典必须与RunJobFlow请求具有相同的结构。但是,请勿在集群配置中包含以下字段:

  • [Name]

  • [Steps]

  • [AutoTerminationPolicy]

  • [Instances][KeepJobFlowAliveWhenNoSteps]

  • [Instances][TerminationProtected]

所有其他 RunJobFlow 参数均可在您的集群配置中使用。有关请求语法的详细信息,请参阅RunJobFlow

下面的示例将集群配置传递给 EMR 步骤定义。这将提示在新的 EMR 集群上启动新作业的步骤。此示例中的 EMR 集群配置包括主节点和核心 EMR 集群节点的规范。有关 Amazon EMR 节点类型的更多信息,请参阅了解节点类型:主节点、核心节点和任务节点

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role", "ServiceRole": "service-role" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )

有关指导您完成完整示例的示例笔记本,请参阅使用集群生命周期管理的管道 EMR 步骤

使用NotebookJobStep以非交互方式运行您的 SageMaker Notebook Job 作为工作流步骤。如果您在 Pipelines drag-and-drop 用户界面中构建管道,请使用执行代码步骤来运行您的笔记本。有关 SageMaker 笔记本作业的更多信息,请参阅SageMaker 笔记本职位

NotebookJobStep 至少需要输入笔记本、映像 URI 和内核名称。有关 Notebook Job 步骤要求以及可以设置为自定义步骤的其他参数的更多信息,请参阅 sagemaker.workflow.steps。 NotebookJobStep

下面的示例使用最小参数定义了 NotebookJobStep

from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=input_notebook, image_uri=image_uri, kernel_name=kernel_name )

您的工作NotebookJobStep流步骤被视为 SageMaker 笔记本作业。因此,在 Studio Classic UI 笔记本作业控制面板中,通过使用 tags 参数包含特定标签来跟踪执行状态。有关包含标签的更多详情,请参阅 在 Studio UI 面板上查看笔记本作业

此外,如果您使用 SageMaker Python SDK 安排笔记本作业,则只能指定某些图像来运行笔记本作业。有关更多信息,请参阅 SageMaker AI Python SDK 笔记本作业的图像限制

当未达到所需条件或状态时,使用 “失败” 步骤停止 Amazon Pipelin SageMaker es 的执行。在 Fail 步骤中还可以输入自定义错误信息,说明管道执行失败的原因。

注意

当一个 Fail 步骤和其他管道步骤同时执行时,管道在所有并发步骤完成之前不会终止。

使用 Fail 步骤的限制

  • 您不能将 Fail 步骤添加到其他步骤的 DependsOn 列表中。有关更多信息,请参阅 步骤之间的自定义依赖关系

  • 其他步骤不能引用 Fail 步骤。它始终 是管道执行的最后一步。

  • 您不能重试以 Fail 步骤结束的管道执行。

您可以创建静态文本字符串形式的 Fail 步骤错误信息。另外,如果您使用 SDK,也可以使用 Pipeline ParametersJoin 操作或其他步骤属性来创建信息量更大的错误信息。

Pipeline Designer

要在管道中添加 Fail 步骤,请执行以下操作:

  1. 按照 启动亚马逊 SageMaker Studio 中的说明打开 Studio 管理控制台。

  2. 在左侧导航窗格中,选择 Pipelines

  3. 选择创建

  4. 选择空白

  5. 在左侧边栏中选择 Fail,然后将其拖到画布上。

  6. 在画布中,选择添加的 Fail 步骤。

  7. 在右侧边栏中,填写设置详情标签中的表格。有关这些选项卡中字段的信息,请参阅 sagemaker.workflow.fail_step。 FailStep

  8. 如果画布上有任何步骤紧接在您添加的 Fail 步骤之前,请单击并拖动光标从该步骤到 Fail 步骤,以创建边缘。

  9. 如果画布上有任何步骤紧接着您添加的 Fail 步骤,请单击并拖动光标从 Fail 步骤到该步骤,以创建边缘。

SageMaker Python SDK

以下示例代码片段使用了一个 FailStep(带有 ErrorMessage,并配置了管道参数)和一项 Join 操作。

from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="AbaloneMSEFail", error_message=Join( on=" ", values=["Execution failed due to MSE >", mse_threshold_param] ), )