@step
으로 데코레이션된 함수를 사용하여 파이프라인 만들기
@step
데코레이터를 사용하여 Python 함수를 파이프라인 단계로 변환하고, 이러한 함수 간에 종속성을 만들어 파이프라인 그래프(또는 방향성 비순환 그래프(DAG))를 만들고, 해당 그래프의 리프 노드를 파이프라인에 단계 목록으로 전달하여 파이프라인을 만들 수 있습니다. 다음 섹션에서는 예시와 함께 이 절차에 대해 자세히 설명합니다.
주제
함수를 단계로 변환
@step
데코레이터를 사용하여 단계를 만들려면 함수에 @step
을 주석으로 지정합니다. 다음 예시에서는 데이터를 사전 처리하는 @step
으로 데코레이션된 함수를 보여줍니다.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
@step
으로 데코레이션된 함수를 간접 호출하면 SageMaker는 함수를 실행하는 대신 DelayedReturn
인스턴스를 반환합니다. DelayedReturn
인스턴스는 해당 함수의 실제 반환에 대한 프록시입니다. DelayedReturn
인스턴스는 인수로 다른 함수에 전달하거나 파이프라인 인스턴스에 단계로 직접 전달할 수 있습니다. DelayedReturn
클래스에 대한 자세한 내용은 sagemaker.workflow.function_step.DelayedReturn
단계 간 종속성 만들기
두 단계 간에 종속성을 만들 때 파이프라인 그래프의 단계 간에 연결을 만듭니다. 다음 섹션에서는 파이프라인 단계 간에 종속성을 만들 수 있는 여러 방법을 소개합니다.
입력 인수를 통한 데이터 종속성
한 함수의 DelayedReturn
출력을 다른 함수에 대한 입력으로 전달하면 파이프라인 DAG에 데이터 종속성이 자동으로 만들어집니다. 다음 예시에서는 preprocess
함수의 DelayedReturn
출력을 train
함수에 전달하면 preprocess
와 train
사이에 종속성이 만들어집니다.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
이전 예시에서는 @step
으로 데코레이션된 훈련 함수를 정의합니다. 이 함수가 간접 호출되면 사전 처리 파이프라인 단계의 DelayedReturn
출력을 입력으로 수신합니다. 훈련 함수를 간접 호출하면 다른 DelayedReturn
인스턴스가 반환됩니다. 이 인스턴스는 파이프라인 DAG를 형성하는 해당 함수에 정의된 모든 이전 단계(예: 이 예시의 preprocess
단계)에 대한 정보를 보유합니다.
이전 예시에서 preprocess
함수는 단일 값을 반환합니다. 목록 또는 튜플과 같은 더 복잡한 반환 유형은 제한 사항 섹션을 참조하세요.
사용자 지정 종속성 정의
이전 예시에서 train
함수는 preprocess
의 DelayedReturn
출력을 수신하고 종속성을 만들었습니다. 이전 단계 출력을 전달하지 않고 종속성을 명시적으로 정의하려면 단계와 함께 add_depends_on
함수를 사용합니다. get_step()
함수를 사용하여 DelayedReturn
인스턴스에서 기본 단계를 검색한 다음, 종속성을 입력으로 사용하여 add_depends_on
_on을 직접 호출할 수 있습니다. get_step()
함수 정의를 보려면 sagemaker.workflow.step_outputs.get_stepget_step()
및 add_depends_on()
을 사용하여 preprocess
및 train
간의 종속성을 만드는 방법을 보여줍니다.
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
@step
으로 데코레이션된 함수에서 기존 파이프라인 단계로 데이터를 주고 받습니다.
@step
으로 데코레이션된 단계와 기존 파이프라인 단계가 포함된 파이프라인을 만들고 이들 간에 데이터를 전달할 수 있습니다. 예를 들어 ProcessingStep
을 사용하여 데이터를 처리하고 결과를 @step
으로 데코레이션된 훈련 함수에 전달할 수 있습니다. 다음 예시에서는 @step
으로 데코레이션된 훈련 단계가 처리 단계의 출력을 참조합니다.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
@step
으로 데코레이션된 단계와 함께 ConditionStep
사용
Pipelines은 파이프라인에서 수행할 작업을 결정하기 위해 이전 단계의 결과를 평가하는 ConditionStep
클래스를 지원합니다. @step
으로 데코레이션된 단계와 함께 ConditionStep
을 사용할 수도 있습니다. ConditionStep
으로 데코레이션된 단계와 함께 @step
을 사용하려면 해당 단계의 출력을 ConditionStep
에 대한 인수로 입력합니다. 다음 예시에서는 조건 단계가 @step
으로 데코레이션된 모델 평가 단계의 출력을 수신합니다.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
단계의 DelayedReturn
출력을 사용하여 파이프라인 정의
@step
데코레이터 사용 여부와 관계없이 파이프라인을 동일한 방식으로 정의합니다. DelayedReturn
인스턴스를 파이프라인에 전달할 때 파이프라인을 빌드하기 위해 전체 단계 목록을 전달할 필요가 없습니다. SDK는 사용자가 정의한 종속성을 기반으로 이전 단계를 자동으로 추론합니다. 파이프라인에 전달한 Step
객체 또는 DelayedReturn
객체의 이전 단계는 모두 파이프라인 그래프에 포함됩니다. 다음 예시에서는 파이프라인이 train
함수에 대한 DelayedReturn
객체를 수신합니다. SageMaker는 파이프라인 그래프에 train
이전 단계로 preprocess
단계를 추가합니다.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
단계 간에 데이터 또는 사용자 지정 종속성이 없고 여러 단계를 병렬로 실행하는 경우 파이프라인 그래프에 둘 이상의 리프 노드가 있습니다. 다음 예시와 같이 목록에 있는 이러한 모든 리프 노드를 파이프라인 정의의 steps
인수에 전달합니다.
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
파이프라인이 실행되면 두 단계가 병렬로 실행됩니다.
리프 노드에는 데이터 또는 사용자 지정 종속성을 통해 정의된 모든 이전 단계에 대한 정보가 포함되어 있으므로 그래프의 리프 노드만 파이프라인에 전달합니다. 파이프라인을 컴파일할 때 SageMaker는 파이프라인 그래프를 구성하는 모든 후속 단계를 추론하고 각 단계를 파이프라인에 별도의 단계로 추가합니다.
파이프라인 생성
다음 코드 조각과 같이 pipeline.create()
를 직접 호출하여 파이프라인을 만듭니다. create()
에 대한 자세한 내용은 sagemaker.workflow.pipeline.Pipeline.create
role = "
pipeline-role
" pipeline.create(role)
pipeline.create()
를 직접 호출하면 SageMaker는 파이프라인 인스턴스의 일부로 정의된 모든 단계를 컴파일합니다. SageMaker는 직렬화된 함수, 인수 및 기타 모든 단계 관련 아티팩트를 Amazon S3에 업로드합니다.
데이터는 다음 구조에 따라 S3 버킷에 상주합니다.
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
는 SageMaker 구성 파일에 정의되어 있으며 전체 파이프라인에 적용됩니다. 정의되지 않은 경우 기본 SageMaker 버킷이 사용됩니다.
참고
SageMaker가 파이프라인을 컴파일할 때마다 SageMaker는 타임스탬프가 현재 시간인 폴더에 단계의 직렬화된 함수, 인수 및 종속성을 저장합니다. 이는 pipeline.create()
, pipeline.update()
, pipeline.upsert()
또는 pipeline.definition()
을 실행할 때마다 발생합니다.