本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 @step
裝飾函數建立管道
您可以使用@step
裝飾器將 Python 函數轉換為管道步驟,在這些函數之間建立相依性以建立管道圖 (或定向非循環圖 (DAG)),並將該圖的分葉節點作為步驟清單傳遞至管道,藉此建立管道。下列各節使用範例詳細解釋此程序。
將函數轉換為步驟
若要使用@step
裝飾器建立步驟,請使用 註釋函數@step
。下列範例顯示預先處理資料的 @step
裝飾函數。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
當您叫用 @step
裝飾的函數時, 會 SageMaker 傳回DelayedReturn
執行個體,而不是執行該函數。DelayedReturn
執行個體是該函數實際傳回的代理。DelayedReturn
執行個體可以作為引數傳遞至另一個 函數,或直接傳遞至管道執行個體作為步驟。如需有關 DelayedReturn
類別的資訊,請參閱 sagemaker.workflow.function_step。DelayedReturn
在步驟之間建立相依性
當您在兩個步驟之間建立相依性時,您可以在管道圖表中的步驟之間建立連線。以下各節介紹多種方法,您可以在管道步驟之間建立相依性。
透過輸入引數的資料相依性
將某個函數的DelayedReturn
輸出作為另一個函數的輸入,會自動在管道 中建立資料相依性DAG。在下列範例中,將preprocess
函數的DelayedReturn
輸出傳遞至train
函數會在 preprocess
和 之間建立相依性train
。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
上一個範例定義了以 裝飾的訓練函數@step
。叫用此函數時,它會接收預處理管道步驟的DelayedReturn
輸出作為輸入。叫用訓練函數會傳回另一個DelayedReturn
執行個體。此執行個體會保留構成管道 之函數 中定義之所有先前步驟 (即此範例中preprocess
的步驟) 的相關資訊DAG。
在上一個範例中,preprocess
函數會傳回單一值。如需更複雜的傳回類型,例如清單或組合,請參閱 限制。
定義自訂相依性
在上一個範例中,train
函數收到 的DelayedReturn
輸出preprocess
並建立相依性。如果您想要明確定義相依性,而不傳遞上一個步驟輸出,請使用 add_depends_on
函數搭配 步驟。您可以使用 get_step()
函數從其DelayedReturn
執行個體擷取基礎步驟,然後使用相依性作為輸入呼叫 add_depends_on
_on。若要檢視get_step()
函數定義,請參閱 sagemaker.workflow.step_outputs.get_step preprocess
和 train
get_step()
之間建立相依性add_depends_on()
。
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
將資料從 @step
裝飾的函數傳遞至傳統管道步驟
您可以建立包含 @step
裝飾步驟和傳統管道步驟的管道,並在它們之間傳遞資料。例如,您可以使用 ProcessingStep
來處理資料,並將結果傳遞給 @step
裝飾的訓練函數。在下列範例中, @step
裝飾的訓練步驟參考處理步驟的輸出。
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
ConditionStep
搭配 @step
裝飾步驟使用
管道支援一個類別,該ConditionStep
類別評估上述步驟的結果,以決定管道中應採取的動作。您也可以ConditionStep
搭配 @step
裝飾步驟使用 。若要將任何 @step
裝飾步驟的輸出與 搭配使用ConditionStep
,請輸入該步驟的輸出作為 的引數ConditionStep
。在下列範例中,條件步驟會收到 @step
裝飾模型評估步驟的輸出。
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
使用步驟的DelayedReturn
輸出定義管道
無論您是否使用@step
裝飾器,您都以相同的方式定義管道。當您將DelayedReturn
執行個體傳遞至管道時,您不需要傳遞建構管道的完整步驟清單。會根據您定義的相依性SDK自動推斷先前的步驟。您傳遞至管道或Step
物件的所有先前步驟DelayedReturn
都包含在管道圖表中。在下列範例中,管道會接收train
函數的DelayedReturn
物件。 SageMaker 會將preprocess
步驟作為 的上一個步驟train
新增至管道圖表。
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
如果步驟之間沒有資料或自訂相依性,而您並行執行多個步驟,則管道圖表具有多個分葉節點。將清單中的所有這些分葉節點傳遞至管道定義中的steps
引數,如下列範例所示:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
當管道執行時,這兩個步驟會平行執行。
您只能將圖形的分葉節點傳遞至管道,因為分葉節點包含透過資料或自訂相依性定義之所有先前步驟的相關資訊。編譯管道時, SageMaker 也會推斷形成管道圖表的所有後續步驟,並將每個步驟新增為管道的個別步驟。
建立管道
呼叫 來建立管道pipeline.create()
,如下列程式碼片段所示。如需 的詳細資訊create()
,請參閱 sagemaker.workflow.pipeline.Pipeline.create
role = "
pipeline-role
" pipeline.create(role)
當您呼叫 時pipeline.create()
, SageMaker 會編譯定義為管道執行個體一部分的所有步驟。 會將序列化函數、引數和所有其他步驟相關成品 SageMaker 上傳到 Amazon S3。
根據下列結構,資料存放在 S3 儲存貯體中:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
會在 SageMaker 組態檔案中定義,並套用至整個管道。如果未定義,則會使用預設 SageMaker 儲存貯體。
注意
每次 SageMaker 編譯管道時, 都會將步驟的序列化函數、引數和相依性 SageMaker 儲存在以目前時間加上時間戳記的資料夾。每次執行 pipeline.create()
、 pipeline.update()
pipeline.upsert()
或 時都會發生這種情況pipeline.definition()
。