기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
@step
장식 함수가 있는 파이프라인 생성
@step
데코레이터를 사용하여 Python 함수를 파이프라인 단계로 변환하고, 이러한 함수 간에 종속성을 생성하여 파이프라인 그래프(또는 지시된 비순환 그래프(DAG))를 생성하고, 해당 그래프의 리프 노드를 파이프라인에 단계 목록으로 전달하여 파이프라인을 생성할 수 있습니다. 다음 섹션에서는 예제와 함께 이 절차에 대해 자세히 설명합니다.
함수를 단계로 변환
@step
데코레이터를 사용하여 단계를 생성하려면 함수에 를 주석으로 지정합니다@step
. 다음 예제는 데이터를 사전 처리하는 @step
데코레이션된 함수를 보여줍니다.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
@step
장식 함수를 호출하면 는 함수를 실행하는 대신 DelayedReturn
인스턴스를 SageMaker 반환합니다. DelayedReturn
인스턴스는 해당 함수의 실제 반환을 위한 프록시입니다. DelayedReturn
인스턴스는 인수로 다른 함수에 전달되거나 파이프라인 인스턴스에 단계로 직접 전달될 수 있습니다. DelayedReturn
클래스에 대한 자세한 내용은 sagemaker.workflow.function_step을 참조하세요DelayedReturn
단계 간 종속성 생성
두 단계 간에 종속성을 생성할 때 파이프라인 그래프의 단계 간에 연결을 생성합니다. 다음 섹션에서는 파이프라인 단계 간에 종속성을 생성할 수 있는 여러 방법을 소개합니다.
입력 인수를 통한 데이터 종속성
한 함수의 DelayedReturn
출력을 다른 함수에 대한 입력으로 전달하면 파이프라인에 데이터 종속성이 자동으로 생성됩니다DAG. 다음 예제에서는 preprocess
함수의 DelayedReturn
출력을 train
함수에 전달하면 preprocess
및 사이의 종속성이 생성됩니다train
.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
이전 예제에서는 로 장식된 훈련 함수를 정의합니다@step
. 이 함수가 호출되면 사전 처리 파이프라인 단계의 DelayedReturn
출력을 입력으로 수신합니다. 훈련 함수를 호출하면 다른 DelayedReturn
인스턴스가 반환됩니다. 이 인스턴스는 파이프라인을 형성하는 해당 함수에 정의된 모든 이전 단계(예: 이 예제의 preprocess
단계)에 대한 정보를 보유합니다DAG.
이전 예제에서 preprocess
함수는 단일 값을 반환합니다. 목록 또는 튜플과 같은 더 복잡한 반환 유형은 섹션을 참조하세요제한 사항.
사용자 지정 종속성 정의
이전 예제에서 train
함수는 의 DelayedReturn
출력을 수신preprocess
하고 종속성을 생성했습니다. 이전 단계 출력을 전달하지 않고 종속성을 명시적으로 정의하려면 단계와 함께 add_depends_on
함수를 사용합니다. get_step()
함수를 사용하여 DelayedReturn
인스턴스에서 기본 단계를 검색한 다음 종속성을 입력으로 사용하여 add_depends_on
_on을 호출할 수 있습니다. get_step()
함수 정의를 보려면 sagemaker.workflow.step_outputs.get_step을preprocess
및 를 train
사용하여 get_step()
및 간의 종속성을 생성하는 방법을 보여줍니다add_depends_on()
.
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
@step
데코레이션된 함수에서 기존 파이프라인 단계로 데이터를 주고 받습니다.
@step
장식 단계와 기존 파이프라인 단계가 포함된 파이프라인을 생성하고 이들 간에 데이터를 전달할 수 있습니다. 예를 들어 ProcessingStep
를 사용하여 데이터를 처리하고 결과를 @step
장식 훈련 함수에 전달할 수 있습니다. 다음 예제에서는 @step
장식 훈련 단계가 처리 단계의 출력을 참조합니다.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
@step
데코레이션된 단계ConditionStep
와 함께 사용
파이프라인은 파이프라인에서 수행할 작업을 결정하기 위해 이전 단계의 결과를 평가하는 ConditionStep
클래스를 지원합니다. @step
를 장식 단계ConditionStep
와 함께 사용할 수도 있습니다. 에서 @step
장식 단계의 출력을 사용하려면 해당 단계의 출력을 에 대한 인수로 ConditionStep
입력합니다ConditionStep
. 다음 예제에서는 조건 단계가 @step
-장식된 모델 평가 단계의 출력을 수신합니다.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
단계 DelayedReturn
출력을 사용하여 파이프라인 정의
@step
데코레이터 사용 여부에 관계없이 파이프라인을 동일한 방식으로 정의합니다. DelayedReturn
인스턴스를 파이프라인에 전달할 때 파이프라인을 빌드하기 위한 전체 단계 목록을 전달할 필요가 없습니다. 는 사용자가 정의한 종속성을 기반으로 이전 단계를 SDK 자동으로 추론합니다. 파이프라인에 전달한 Step
객체 또는 DelayedReturn
객체의 이전 단계는 모두 파이프라인 그래프에 포함됩니다. 다음 예제에서는 파이프라인이 train
함수에 대한 DelayedReturn
객체를 수신합니다. SageMaker 는 파이프라인 train
그래프에 preprocess
이전 단계인 단계를 추가합니다.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
단계 간에 데이터 또는 사용자 지정 종속성이 없고 여러 단계를 병렬로 실행하는 경우 파이프라인 그래프에 둘 이상의 리프 노드가 있습니다. 다음 예제와 같이 목록에 있는 이러한 리프 노드를 모두 파이프라인 정의의 steps
인수에 전달합니다.
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
파이프라인이 실행되면 두 단계가 병렬로 실행됩니다.
리프 노드에는 데이터 또는 사용자 지정 종속성을 통해 정의된 모든 이전 단계에 대한 정보가 포함되어 있으므로 그래프의 리프 노드만 파이프라인에 전달합니다. 파이프라인을 컴파일할 때 SageMaker 파이프라인 그래프를 구성하는 모든 후속 단계를 추론하고 각 단계를 파이프라인에 별도의 단계로 추가합니다.
파이프라인 생성
다음 조각과 pipeline.create()
같이 를 호출하여 파이프라인을 생성합니다. 에 대한 자세한 내용은 sagemaker.workflow.pipeline.Pipeline.create를create()
참조하세요.
role = "
pipeline-role
" pipeline.create(role)
를 호출하면 파이프라인 인스턴스의 일부로 정의된 모든 단계를 pipeline.create()
SageMaker 컴파일합니다. 는 직렬화된 함수, 인수 및 기타 모든 단계 관련 아티팩트를 Amazon S3에 SageMaker 업로드합니다.
데이터는 다음 구조에 따라 S3 버킷에 상주합니다.
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
는 SageMaker 구성 파일에 정의되어 있으며 전체 파이프라인에 적용됩니다. 정의되지 않은 경우 기본 SageMaker 버킷이 사용됩니다.
참고
가 파이프라인을 SageMaker 컴파일할 때마다 단계의 직렬화된 함수, 인수 및 종속성을 현재 시간이 지정된 폴더에 SageMaker 저장합니다. 이는 pipeline.create()
, pipeline.upsert()
또는 pipeline.update()
를 실행할 때마다 발생합니다pipeline.definition()
.