Pipelines 步骤
Pipelines 由多个步骤组成。这些步骤使用属性定义管道采取的操作以及各步骤之间的关系。下一页将介绍步骤的类型、属性以及它们之间的关系。
步骤属性
使用 properties
属性在管道中的步骤之间添加数据依赖关系。管道使用这些数据依赖关系来根据管道定义构建 DAG。这些属性可以作为占位符值引用,并在运行时解析。
Pipelines 步骤的 properties
属性与相应 SageMaker 作业类型的 Describe
调用所返回的对象相匹配。对于每种作业类型,Describe
调用都会返回以下响应对象:
-
ProcessingStep
– DescribeProcessingJob -
TrainingStep
– DescribeTrainingJob -
TransformStep
– DescribeTransformJob
要查看在创建数据依赖关系期间每种步骤类型可以引用哪些属性,请参阅 Amazon SageMaker Python SDK
步骤并行
当一个步骤不依赖于任何其他步骤时,它会在管道执行后立即运行。但是,并行执行过多的管道步骤可能会很快耗尽可用资源。使用 ParallelismConfiguration
控制管道执行的并发步骤数。
以下示例使用 ParallelismConfiguration
将并发步骤数限制设置为 5。
pipeline.create( parallelism_config=ParallelismConfiguration(5), )
步骤之间的数据依赖性
您可以通过指定步骤之间的数据关系来定义 DAG 的结构。要在步骤之间创建数据依赖关系,请将一个步骤的属性作为输入传递给管道中的另一个步骤。接收输入的步骤要等到提供输入的步骤完成运行后才会开始。
数据依赖关系使用以下格式的 JsonPath 表示法。这种格式会遍历 JSON 属性文件。这意味着您可以根据需要添加尽可能多的 <property>
实例,以便在文件中找到所需的嵌套属性。有关 JsonPath 表示法的更多信息,请参阅 JsonPath 存储库
<step_name>
.properties.<property>
.<property>
以下内容说明了如何使用处理步骤的 ProcessingOutputConfig
属性指定 Amazon S3 存储桶。
step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
要创建数据依赖关系,请按如下方式将存储桶传递给训练步骤。
from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )
要查看在创建数据依赖关系期间每种步骤类型可以引用哪些属性,请参阅 Amazon SageMaker Python SDK
步骤之间的自定义依赖关系
指定数据依赖关系时,Pipelines 会在各步骤之间提供数据连接。另外,一个步骤可以访问前一个步骤的数据,而无需直接使用管道。在这种情况下,您可以创建一个自定义依赖关系,告诉 Pipelines 在另一个步骤运行完之前不要启动该步骤。您可以通过指定步骤的 DependsOn
属性来创建自定义依赖关系。
例如,以下内容定义了步骤 C
,该步骤在步骤 A
和步骤 B
都完成运行后才开始。
{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }
如果依赖关系会产生循环依赖关系,Pipelines 会抛出验证异常。
以下示例创建了一个在处理步骤完成运行后开始的训练步骤。
processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])
以下示例创建了一个训练步骤,该步骤要等到两个不同的处理步骤完成运行后才会开始。
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])
下面提供了另一种创建自定义依赖关系的方法。
training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])
以下示例创建了一个训练步骤,该步骤接收来自一个处理步骤的输入,然后等待另一个处理步骤完成运行。
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])
以下示例说明如何检索步骤的自定义依赖关系的字符串列表。
custom_dependencies = training_step.depends_on
自定义映像一步到位
在管道中创建步骤时,您可以使用任何可用的 SageMaker 深度学习容器映像
您还可以在管道步骤中使用自己的容器。由于无法在 Studio Classic 中创建映像,因此在使用 Pipelines 之前,必须使用其他方法创建映像。
要在为管道创建步骤时使用自己的容器,请在估算器定义中包含映像 URI。有关将您自己的容器与 SageMaker 配合使用的更多信息,请参阅将 Docker 容器与 SageMaker 配合使用。