本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建带有@step
装饰函数的管道
您可以使用@step
装饰器将 Python 函数转换为工作流步骤,在这些函数之间创建依赖关系以创建流水线图(或有向无环图 (DAG)),然后将该图的叶节点作为步骤列表传递给管道,从而创建管道。以下各节通过示例详细说明了此过程。
将函数转换为步骤
要使用@step
装饰器创建步骤,请使用注释该函数。@step
以下示例显示了一个预处理@step
数据的经过装饰的函数。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
当您调用@step
装饰函数时,会 SageMaker 返回一个DelayedReturn
实例,而不是运行该函数。DelayedReturn
实例是该函数实际返回的代理。该DelayedReturn
实例可以作为参数传递给另一个函数,也可以作为步骤直接传递给管道实例。有关该DelayedReturn
课程的信息,请参阅 sagemaker.workflow.function_step。 DelayedReturn
在步骤之间创建依赖关系
当您在两个步骤之间创建依赖关系时,将在工作流图中的步骤之间创建连接。以下各节介绍了在工作流步骤之间创建依赖关系的多种方法。
通过输入参数实现的数据依赖性
将一个函数的DelayedReturn
输出作为输入传递给另一个函数会自动在管道中创建数据依赖关系DAG。在以下示例中,将preprocess
函数的DelayedReturn
输出传递给函数会在train
preprocess
和之间创建依赖关系train
。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
前面的示例定义了一个用装饰的训练函数@step
。调用此函数时,它将接收预处理管道步骤的DelayedReturn
输出作为输入。调用训练函数会返回另一个DelayedReturn
实例。此实例保存有关该函数中定义的所有先前步骤(即本示例中的preprocess
步骤)的信息,这些步骤构成了管道DAG。
在前面的示例中,该preprocess
函数返回单个值。有关列表或元组等更复杂的返回类型,请参阅。限制
定义自定义依赖关系
在前面的示例中,该train
函数接收了的DelayedReturn
输出preprocess
并创建了一个依赖关系。如果要在不传递上一步输出的情况下显式定义依赖关系,请将该add_depends_on
函数与步骤一起使用。您可以使用该get_step()
函数从其DelayedReturn
实例中检索底层步骤,然后使用依赖项作为输入调用 add_depends_on
_on。要查看get_step()
函数定义,请参阅 sagemaker.workflow.step_outputstrain
使用preprocess
get_step()
和在和之间创建依赖关系add_depends_on()
。
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
将数据传入和从@step
装饰过的函数传递到传统的流水线步骤
您可以创建一个包含@step
装饰步骤和传统工作流步骤的管道,并在它们之间传递数据。例如,您可以使用ProcessingStep
处理数据并将其结果传递给经过@step
装饰的训练函数。在以下示例中,经过@step
装饰的训练步骤引用了处理步骤的输出。
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
与@step
装饰过ConditionStep
的台阶一起使用
Pipelines 支持一个ConditionStep
类,该类可以评估前面步骤的结果,以决定在管道中执行什么操作。你也可以ConditionStep
用@step
装饰过的台阶来使用。要将任何@step
装饰步骤的输出与一起使用ConditionStep
,请将该步骤的输出作为参数输入。ConditionStep
在以下示例中,条件步骤接收经过@step
装饰的模型评估步骤的输出。
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
使用步骤DelayedReturn
输出定义管道
无论你是否使用@step
装饰器,你定义管道的方式都是一样的。当您将DelayedReturn
实例传递给管道时,您无需传递完整的步骤列表即可构建管道。会根据您定义的依赖关系SDK自动推断前面的步骤。您传递给管道的Step
对象或DelayedReturn
对象的所有先前步骤都包含在管道图中。在以下示例中,管道接收train
函数的DelayedReturn
对象。 SageMaker 将该preprocess
步骤作为之前的train
步骤添加到管道图中。
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
如果步骤之间没有数据或自定义依赖关系,并且您并行运行多个步骤,则管道图具有多个叶节点。将列表中的所有这些叶节点传递给管道定义中的steps
参数,如以下示例所示:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
当管道运行时,两个步骤并行运行。
您只能将图表的叶节点传递给管道,因为叶节点包含有关通过数据或自定义依赖关系定义的所有先前步骤的信息。当它编译管道时, SageMaker 还会推断出构成管道图的所有后续步骤,并将每个步骤作为单独的步骤添加到管道中。
创建管道
通过调用创建管道pipeline.create()
,如以下代码段所示。有关详细信息,请参阅 sagcreate()
.pipeline.pipeline.create。
role = "
pipeline-role
" pipeline.create(role)
当您调用时pipeline.create()
, SageMaker 会编译所有定义为管道实例一部分的步骤。 SageMaker 将序列化函数、参数和所有其他与步骤相关的项目上传到 Amazon S3。
数据根据以下结构驻留在 S3 存储桶中:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
在 SageMaker 配置文件中定义,适用于整个管道。如果未定义,则使用默认 SageMaker 存储桶。
注意
每次 SageMaker 编译管道时,都会将步骤的序列化函数、参数和依赖项 SageMaker 保存在带有当前时间戳的文件夹中。每次运行pipeline.create()
pipeline.update()
、pipeline.upsert()
或时都会发生这种情况pipeline.definition()
。