使用 @step裝飾函數建立管道 - Amazon SageMaker

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 @step裝飾函數建立管道

您可以使用@step裝飾器將 Python 函數轉換為管道步驟,在這些函數之間建立相依性以建立管道圖 (或定向非循環圖 (DAG)),並將該圖的分葉節點作為步驟清單傳遞至管道,藉此建立管道。下列各節使用範例詳細解釋此程序。

將函數轉換為步驟

若要使用@step裝飾器建立步驟,請使用 註釋函數@step。下列範例顯示預先處理資料的 @step裝飾函數。

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

當您叫用 @step裝飾的函數時, 會 SageMaker 傳回DelayedReturn執行個體,而不是執行該函數。DelayedReturn 執行個體是該函數實際傳回的代理。DelayedReturn 執行個體可以作為引數傳遞至另一個 函數,或直接傳遞至管道執行個體作為步驟。如需有關 DelayedReturn類別的資訊,請參閱 sagemaker.workflow.function_step。DelayedReturn

當您在兩個步驟之間建立相依性時,您可以在管道圖表中的步驟之間建立連線。以下各節介紹多種方法,您可以在管道步驟之間建立相依性。

將某個函數的DelayedReturn輸出作為另一個函數的輸入,會自動在管道 中建立資料相依性DAG。在下列範例中,將preprocess函數的DelayedReturn輸出傳遞至train函數會在 preprocess和 之間建立相依性train

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

上一個範例定義了以 裝飾的訓練函數@step。叫用此函數時,它會接收預處理管道步驟的DelayedReturn輸出作為輸入。叫用訓練函數會傳回另一個DelayedReturn執行個體。此執行個體會保留構成管道 之函數 中定義之所有先前步驟 (即此範例中preprocess的步驟) 的相關資訊DAG。

在上一個範例中,preprocess函數會傳回單一值。如需更複雜的傳回類型,例如清單或組合,請參閱 限制

在上一個範例中,train函數收到 的DelayedReturn輸出preprocess並建立相依性。如果您想要明確定義相依性,而不傳遞上一個步驟輸出,請使用 add_depends_on函數搭配 步驟。您可以使用 get_step()函數從其DelayedReturn執行個體擷取基礎步驟,然後使用相依性作為輸入呼叫 add_depends_on_on。若要檢視get_step()函數定義,請參閱 sagemaker.workflow.step_outputs.get_step 。下列範例示範如何使用 preprocesstrain get_step() 之間建立相依性add_depends_on()

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

您可以建立包含 @step裝飾步驟和傳統管道步驟的管道,並在它們之間傳遞資料。例如,您可以使用 ProcessingStep 來處理資料,並將結果傳遞給 @step裝飾的訓練函數。在下列範例中, @step裝飾的訓練步驟參考處理步驟的輸出。

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

ConditionStep 搭配 @step裝飾步驟使用

管道支援一個類別,該ConditionStep類別評估上述步驟的結果,以決定管道中應採取的動作。您也可以ConditionStep搭配 @step裝飾步驟使用 。若要將任何 @step裝飾步驟的輸出與 搭配使用ConditionStep,請輸入該步驟的輸出作為 的引數ConditionStep。在下列範例中,條件步驟會收到 @step裝飾模型評估步驟的輸出。

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

使用步驟的DelayedReturn輸出定義管道

無論您是否使用@step裝飾器,您都以相同的方式定義管道。當您將DelayedReturn執行個體傳遞至管道時,您不需要傳遞建構管道的完整步驟清單。會根據您定義的相依性SDK自動推斷先前的步驟。您傳遞至管道或Step物件的所有先前步驟DelayedReturn都包含在管道圖表中。在下列範例中,管道會接收train函數的DelayedReturn物件。 SageMaker 會將preprocess步驟作為 的上一個步驟train新增至管道圖表。

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

如果步驟之間沒有資料或自訂相依性,而您並行執行多個步驟,則管道圖表具有多個分葉節點。將清單中的所有這些分葉節點傳遞至管道定義中的steps引數,如下列範例所示:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

當管道執行時,這兩個步驟會平行執行。

您只能將圖形的分葉節點傳遞至管道,因為分葉節點包含透過資料或自訂相依性定義之所有先前步驟的相關資訊。編譯管道時, SageMaker 也會推斷形成管道圖表的所有後續步驟,並將每個步驟新增為管道的個別步驟。

建立管道

呼叫 來建立管道pipeline.create(),如下列程式碼片段所示。如需 的詳細資訊create(),請參閱 sagemaker.workflow.pipeline.Pipeline.create

role = "pipeline-role" pipeline.create(role)

當您呼叫 時pipeline.create(), SageMaker 會編譯定義為管道執行個體一部分的所有步驟。 會將序列化函數、引數和所有其他步驟相關成品 SageMaker 上傳到 Amazon S3。

根據下列結構,資料存放在 S3 儲存貯體中:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_uri 會在 SageMaker 組態檔案中定義,並套用至整個管道。如果未定義,則會使用預設 SageMaker 儲存貯體。

注意

每次 SageMaker 編譯管道時, 都會將步驟的序列化函數、引數和相依性 SageMaker 儲存在以目前時間加上時間戳記的資料夾。每次執行 pipeline.create()pipeline.update()pipeline.upsert()或 時都會發生這種情況pipeline.definition()