翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
@step
でデコレートした関数を使用してパイプラインを作成する
@step
デコレータを使用して Python 関数をパイプラインステップに変換し、それらの関数間に依存関係を作成してパイプライングラフ (または有向非巡回グラフ (DAG)) を作成して、このグラフのリーフノードをステップのリストとしてパイプラインに渡すことで、パイプラインを作成できます。以下のセクションでは、この手順を例を使って詳しく説明します。
トピック
関数をステップに変換する
@step
デコレータを使用してステップを作成するには、@step
関数を使用して注釈を付けます。次の例は、データを事前処理する @step
でデコレートした関数を説明しています。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
で@step
デコレートされた関数を呼び出すと、SageMaker AI は関数を実行する代わりにDelayedReturn
インスタンスを返します。DelayedReturn
インスタンスは、その関数の実際の戻り値のプロキシです。DelayedReturn
インスタンスは、別の関数に引数として渡すことも、ステップとしてパイプラインインスタンスに直接渡すこともできます。DelayedReturn
クラスの詳細については、「sagemaker.workflow.function_step.DelayedReturn
ステップ間の依存関係を作成する
2 つのステップ間に依存関係を作成する際は、パイプライングラフのステップ間に接続を作成します。以下のセクションでは、パイプラインステップ間の依存関係を作成する複数の方法を紹介します。
入力引数を使用したデータ依存関係
ある関数の DelayedReturn
出力を別の関数への入力として渡すと、パイプライン DAG にデータ依存関係が自動的に作成されます。次の例では、preprocess
関数の DelayedReturn
出力を train
関数に渡す preprocess
と、train
との間に依存関係が作成されます。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
前の例では、@step
でデコレートされたトレーニング関数を定義しています。この関数が呼び出されると、前処理パイプラインステップの DelayedReturn
出力を入力として受け取ります。トレーニング関数を呼び出すと、別の DelayedReturn
インスタンスが返されます。このインスタンスは、パイプライン DAG を形成する、その関数で定義された以前のすべてのステップ (この例の preprocess
ステップ) に関する情報を保持します。
前の例では、preprocess
関数は単一の値を返します。リストやタプルなどのより複雑な戻り値タイプについては、「制限」を参照してください。
カスタム依存関係を定義する
前の例では、train
関数は preprocess
の DelayedReturn
出力を受け取り、依存関係を作成しました。前のステップの出力を渡さずに依存関係を明示的に定義する場合は、add_depends_on
関数をステップで使用します。get_step()
関数を使用して、その DelayedReturn
インスタンスから基盤となるステップを取得し、依存関係を入力として add_depends_on
_on を呼び出します。get_step()
関数の定義を表示するには、「sagemaker.workflow.step_outputs.get_stepget_step()
と add_depends_on()
を使用して、preprocess
と train
の依存関係を作成する方法を説明しています。
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
@step
でデコレートした関数から従来のパイプラインステップにデータを渡す
@step
でデコレートしたステップと従来のパイプラインステップを含むパイプラインを作成し、両者間でデータを渡すパイプラインを作成できます。例えば、ProcessingStep
を使用してデータを処理し、その結果を @step
でデコレートしたトレーニング関数に渡すことができます。次の例では、@step
でデコレートしたトレーニングステップは、処理ステップの出力を参照します。
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
@step
でデコレートしたステップで ConditionStep
でを使用する
Pipelines は、先行するステップの結果を評価してパイプラインで実行するアクションを決定する、ConditionStep
クラスをサポートしています。ConditionStep
は、@step
でデコレートしたステップでも使用できます。ConditionStep
で任意の @step
でデコレートしたステップの出力を使用するには、そのステップの出力を ConditionStep
の引数として入力します。次の例の条件ステップは、@step
でデコレートしたモデル評価ステップの出力を受け取ります。
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
ステップの DelayedReturn
出力を使用してパイプラインを定義する
パイプラインの定義方法は、@step
デコレータを使用する場合も使用しない場合も同じです。DelayedReturn
インスタンスをパイプラインに渡す場合、パイプラインを構築するためのステップの完全なリストを渡す必要はありません。SDK は、定義した依存関係に基づいて前のステップを自動的に推測します。パイプラインに渡した Step
オブジェクトまたは DelayedReturn
オブジェクトの前のすべてのステップすべてがパイプライングラフに含まれます。次の例では、パイプラインは train
関数の DelayedReturn
オブジェクトを受け取ります。SageMaker AI は、 preprocess
ステップを の前のステップとしてパイプライングラフに追加train
します。
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
ステップ間にデータやカスタム依存関係がなく、複数のステップを並行して実行する場合、パイプライングラフには複数のリーフノードが含まれます。このようなリーフノードをすべてリストにして、パイプライン定義の steps
引数に渡します。
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
パイプラインを実行すると、両方のステップが並行して実行されます。
リーフノードには、データまたはカスタム依存関係を通じて定義された以前のすべてのステップに関する情報が含まれているため、パイプラインに渡すのはグラフのリーフノードのみです。パイプラインをコンパイルすると、SageMaker AI はパイプライングラフを形成する後続のすべてのステップも推測し、それぞれを個別のステップとしてパイプラインに追加します。
パイプラインを作成する
次のスニペットに示されるとおり、pipeline.create()
を呼び出してパイプラインを作成します。create()
の詳細については、「sagemaker.workflow.pipeline.Pipeline.create
role = "
pipeline-role
" pipeline.create(role)
を呼び出すとpipeline.create()
、SageMaker AI はパイプラインインスタンスの一部として定義されたすべてのステップをコンパイルします。SageMaker AI は、シリアル化された関数、引数、およびその他のすべてのステップ関連のアーティファクトを Amazon S3 にアップロードします。
データは、次の構造に従って S3 バケットに存在します。
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
は SageMaker AI 設定ファイルで定義され、パイプライン全体に適用されます。定義されていない場合は、デフォルトの SageMaker AI バケットが使用されます。
注記
SageMaker AI がパイプラインをコンパイルするたびに、SageMaker AI はステップのシリアル化された関数、引数、依存関係を現在の時刻のタイムスタンプが付けられたフォルダに保存します。これは、pipeline.create()
、pipeline.update()
、pipeline.upsert()
または pipeline.definition()
を実行する都度、発生します。