@step장식 함수가 있는 파이프라인 생성 - Amazon SageMaker

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

@step장식 함수가 있는 파이프라인 생성

@step 데코레이터를 사용하여 Python 함수를 파이프라인 단계로 변환하고, 이러한 함수 간에 종속성을 생성하여 파이프라인 그래프(또는 지시된 비순환 그래프(DAG))를 생성하고, 해당 그래프의 리프 노드를 파이프라인에 단계 목록으로 전달하여 파이프라인을 생성할 수 있습니다. 다음 섹션에서는 예제와 함께 이 절차에 대해 자세히 설명합니다.

함수를 단계로 변환

@step 데코레이터를 사용하여 단계를 생성하려면 함수에 를 주석으로 지정합니다@step. 다음 예제는 데이터를 사전 처리하는 @step데코레이션된 함수를 보여줍니다.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

@step장식 함수를 호출하면 는 함수를 실행하는 대신 DelayedReturn 인스턴스를 SageMaker 반환합니다. DelayedReturn 인스턴스는 해당 함수의 실제 반환을 위한 프록시입니다. DelayedReturn 인스턴스는 인수로 다른 함수에 전달되거나 파이프라인 인스턴스에 단계로 직접 전달될 수 있습니다. DelayedReturn 클래스에 대한 자세한 내용은 sagemaker.workflow.function_step을 참조하세요DelayedReturn.

두 단계 간에 종속성을 생성할 때 파이프라인 그래프의 단계 간에 연결을 생성합니다. 다음 섹션에서는 파이프라인 단계 간에 종속성을 생성할 수 있는 여러 방법을 소개합니다.

한 함수의 DelayedReturn 출력을 다른 함수에 대한 입력으로 전달하면 파이프라인에 데이터 종속성이 자동으로 생성됩니다DAG. 다음 예제에서는 preprocess 함수의 DelayedReturn 출력을 train 함수에 전달하면 preprocess 및 사이의 종속성이 생성됩니다train.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

이전 예제에서는 로 장식된 훈련 함수를 정의합니다@step. 이 함수가 호출되면 사전 처리 파이프라인 단계의 DelayedReturn 출력을 입력으로 수신합니다. 훈련 함수를 호출하면 다른 DelayedReturn 인스턴스가 반환됩니다. 이 인스턴스는 파이프라인을 형성하는 해당 함수에 정의된 모든 이전 단계(예: 이 예제의 preprocess 단계)에 대한 정보를 보유합니다DAG.

이전 예제에서 preprocess 함수는 단일 값을 반환합니다. 목록 또는 튜플과 같은 더 복잡한 반환 유형은 섹션을 참조하세요제한 사항.

이전 예제에서 train 함수는 의 DelayedReturn 출력을 수신preprocess하고 종속성을 생성했습니다. 이전 단계 출력을 전달하지 않고 종속성을 명시적으로 정의하려면 단계와 함께 add_depends_on 함수를 사용합니다. get_step() 함수를 사용하여 DelayedReturn 인스턴스에서 기본 단계를 검색한 다음 종속성을 입력으로 사용하여 add_depends_on_on을 호출할 수 있습니다. get_step() 함수 정의를 보려면 sagemaker.workflow.step_outputs.get_step을 참조하세요. 다음 예제에서는 preprocess 및 를 train 사용하여 get_step() 및 간의 종속성을 생성하는 방법을 보여줍니다add_depends_on().

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

@step장식 단계와 기존 파이프라인 단계가 포함된 파이프라인을 생성하고 이들 간에 데이터를 전달할 수 있습니다. 예를 들어 ProcessingStep를 사용하여 데이터를 처리하고 결과를 @step장식 훈련 함수에 전달할 수 있습니다. 다음 예제에서는 @step장식 훈련 단계가 처리 단계의 출력을 참조합니다.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

@step데코레이션된 단계ConditionStep와 함께 사용

파이프라인은 파이프라인에서 수행할 작업을 결정하기 위해 이전 단계의 결과를 평가하는 ConditionStep 클래스를 지원합니다. @step를 장식 단계ConditionStep와 함께 사용할 수도 있습니다. 에서 @step장식 단계의 출력을 사용하려면 해당 단계의 출력을 에 대한 인수로 ConditionStep입력합니다ConditionStep. 다음 예제에서는 조건 단계가 @step-장식된 모델 평가 단계의 출력을 수신합니다.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

단계 DelayedReturn 출력을 사용하여 파이프라인 정의

@step 데코레이터 사용 여부에 관계없이 파이프라인을 동일한 방식으로 정의합니다. DelayedReturn 인스턴스를 파이프라인에 전달할 때 파이프라인을 빌드하기 위한 전체 단계 목록을 전달할 필요가 없습니다. 는 사용자가 정의한 종속성을 기반으로 이전 단계를 SDK 자동으로 추론합니다. 파이프라인에 전달한 Step 객체 또는 DelayedReturn 객체의 이전 단계는 모두 파이프라인 그래프에 포함됩니다. 다음 예제에서는 파이프라인이 train 함수에 대한 DelayedReturn 객체를 수신합니다. SageMaker 는 파이프라인 train그래프에 preprocess 이전 단계인 단계를 추가합니다.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

단계 간에 데이터 또는 사용자 지정 종속성이 없고 여러 단계를 병렬로 실행하는 경우 파이프라인 그래프에 둘 이상의 리프 노드가 있습니다. 다음 예제와 같이 목록에 있는 이러한 리프 노드를 모두 파이프라인 정의의 steps 인수에 전달합니다.

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

파이프라인이 실행되면 두 단계가 병렬로 실행됩니다.

리프 노드에는 데이터 또는 사용자 지정 종속성을 통해 정의된 모든 이전 단계에 대한 정보가 포함되어 있으므로 그래프의 리프 노드만 파이프라인에 전달합니다. 파이프라인을 컴파일할 때 SageMaker 파이프라인 그래프를 구성하는 모든 후속 단계를 추론하고 각 단계를 파이프라인에 별도의 단계로 추가합니다.

파이프라인 생성

다음 조각과 pipeline.create()같이 를 호출하여 파이프라인을 생성합니다. 에 대한 자세한 내용은 sagemaker.workflow.pipeline.Pipeline.create를 create()참조하세요.

role = "pipeline-role" pipeline.create(role)

를 호출하면 파이프라인 인스턴스의 일부로 정의된 모든 단계를 pipeline.create() SageMaker 컴파일합니다. 는 직렬화된 함수, 인수 및 기타 모든 단계 관련 아티팩트를 Amazon S3에 SageMaker 업로드합니다.

데이터는 다음 구조에 따라 S3 버킷에 상주합니다.

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_uri 는 SageMaker 구성 파일에 정의되어 있으며 전체 파이프라인에 적용됩니다. 정의되지 않은 경우 기본 SageMaker 버킷이 사용됩니다.

참고

가 파이프라인을 SageMaker 컴파일할 때마다 단계의 직렬화된 함수, 인수 및 종속성을 현재 시간이 지정된 폴더에 SageMaker 저장합니다. 이는 pipeline.create(), pipeline.upsert() 또는 pipeline.update()를 실행할 때마다 발생합니다pipeline.definition().