使用 @step 装饰函数创建管道 - 亚马逊 SageMaker AI

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 @step 装饰函数创建管道

您可以使用@step装饰器将 Python 函数转换为工作流步骤,在这些函数之间创建依赖关系以创建流水线图(或有向无环图 (DAG)),然后将该图的叶节点作为步骤列表传递给管道,从而创建管道。下文将结合示例详细解释这一程序。

将函数转换为步骤

要使用 @step 装饰器创建一个步骤,请使用 @step 对功能进行注释。下面的示例展示了一个预处理数据的 @step 装饰功能。

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

当你调用@step装饰过的函数时, SageMaker AI 会返回一个DelayedReturn实例,而不是运行该函数。DelayedReturn 实例是该功能实际返回值的代理。DelayedReturn 实例可以作为参数传递给其他函数,也可以作为步骤直接传递给管道实例。有关该DelayedReturn课程的信息,请参阅 sagemaker.workflow.function_step。 DelayedReturn

在两个步骤之间创建依赖关系时,就在管道图中的步骤之间创建了连接。下文将介绍在管道步骤之间创建依赖关系的多种方法。

将一个函数的DelayedReturn输出作为输入传递给另一个函数会自动在管道中创建数据依赖关系DAG。在下面的示例中,将 preprocess 函数的 DelayedReturn 输出传递给 train 函数会在 preprocesstrain 之间产生依赖关系。

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

上一个示例定义了一个训练函数,该函数用 @step 修饰。调用该函数时,它会接收预处理管道步骤的 DelayedReturn 输出作为输入。调用训练函数会返回另一个 DelayedReturn 实例。此实例保存有关该函数中定义的所有先前步骤(即本示例中的preprocess步骤)的信息,这些步骤构成了管道DAG。

在上一个示例中,preprocess 函数返回一个值。有关列表或元组等更复杂的返回类型,请参阅 限制

在上一个示例中,train 函数接收了 preprocessDelayedReturn 输出,并创建了一个依赖关系。如果想明确定义依赖关系,而不传递前一步的输出结果,请在步骤中使用 add_depends_on 函数。您可以使用 get_step() 函数从其 DelayedReturn 实例中获取基础步骤,然后将依赖关系作为输入调用 add_depends_on_on。要查看 get_step() 函数定义,请参阅 sagemaker.workflow.step_outputs.get_step。下面的示例演示了如何使用 get_step()add_depends_on()preprocesstrain 之间创建依赖关系。

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

您可以创建一个包含 @step 装饰步骤和传统管道步骤的管道,并在两者之间传递数据。例如,您可以使用 ProcessingStep 处理数据,并将处理结果传递给 @step 装饰的训练函数。在下面的示例中,@step 装饰的训练步骤引用了处理步骤的输出。

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

使用 ConditionStep@step 装饰步骤

管道支持一个 ConditionStep 类,该类会评估前几个步骤的结果,以决定在管道中采取什么行动。您也可以在 @step 装饰的步骤中使用 ConditionStep。要使用 ConditionStep 中任何 @step 装饰步骤的输出,请将该步骤的输出作为 ConditionStep 的参数输入。在下面的示例中,条件步骤接收 @step 装饰模型评测步骤的输出。

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

使用步骤的 DelayedReturn 输出定义管道

无论是否使用 @step 装饰器,定义管道的方式都是一样的。向管道传递 DelayedReturn 实例时,无需传递完整的步骤列表来构建管道。会根据您定义的依赖关系SDK自动推断前面的步骤。您传递给管道的所有上一步骤 Step 对象或相关 DelayedReturn 对象都包含在管道图中。在下面的示例中,管道为 train 函数接收 DelayedReturn 对象。 SageMaker AI 将该preprocess步骤作为上一步添加到管道图中。train

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

如果步骤之间没有数据或自定义依赖关系,并且并行运行多个步骤,管道图就会有多个叶节点。如下例所示,将所有这些叶节点以列表形式传递给管道定义中的 steps 参数:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

管道运行时,两个步骤并行。

您只需将图的叶节点传递给管道,因为叶节点包含通过数据或自定义依赖关系定义的所有先前步骤的信息。在编译管道时, SageMaker AI 还会推断出构成管道图的所有后续步骤,并将每个步骤作为单独的步骤添加到管道中。

创建管道

通过调用 pipeline.create() 创建管道,如以下代码所示。有关 create() 的详细信息,请参阅 sagemaker.workflow.pipeline.Pipeline.create

role = "pipeline-role" pipeline.create(role)

当你调用时pipeline.create(), SageMaker AI 会编译所有定义为管道实例一部分的步骤。 SageMaker AI 将序列化函数、参数和所有其他与步骤相关的项目上传到 Amazon S3。

数据按照以下结构存放在 S3 存储桶中:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_uri在 SageMaker AI 配置文件中定义,适用于整个管道。如果未定义,则使用默认 SageMaker AI 存储桶。

注意

每次 SageMaker AI 编译管道时, SageMaker AI 都会将步骤的序列化函数、参数和依赖项保存在带有当前时间戳的文件夹中。每次运行 pipeline.create()pipeline.update()pipeline.upsert()pipeline.definition() 时都会出现这种情况。