Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Buat pipa dengan fungsi yang @step
didekorasi
Anda dapat membuat pipeline dengan mengubah fungsi Python menjadi langkah-langkah pipeline menggunakan dekorator, membuat dependensi @step
di antara fungsi-fungsi tersebut untuk membuat grafik pipeline (atau grafik asiklik terarah DAG ()), dan meneruskan simpul daun grafik tersebut sebagai daftar langkah ke pipeline. Bagian berikut menjelaskan prosedur ini secara rinci dengan contoh.
Topik
Ubah fungsi menjadi langkah
Untuk membuat langkah menggunakan @step
dekorator, beri anotasi fungsi dengan. @step
Contoh berikut menunjukkan fungsi @step
-decorated yang memproses data sebelumnya.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Saat Anda memanggil fungsi @step
-decorated, SageMaker mengembalikan DelayedReturn
instance alih-alih menjalankan fungsi. Sebuah DelayedReturn
instance adalah proxy untuk pengembalian aktual dari fungsi itu. DelayedReturn
Instance dapat diteruskan ke fungsi lain sebagai argumen atau langsung ke instance pipeline sebagai langkah. Untuk informasi tentang DelayedReturn
kelas, lihat sagemaker.workflow.function_step. DelayedReturn
Buat dependensi di antara langkah-langkah
Saat Anda membuat dependensi di antara dua langkah, Anda membuat koneksi antara langkah-langkah dalam grafik pipeline Anda. Bagian berikut memperkenalkan beberapa cara Anda dapat membuat dependensi di antara langkah-langkah pipeline Anda.
Ketergantungan data melalui argumen masukan
Melewati DelayedReturn
output dari satu fungsi sebagai input ke fungsi lain secara otomatis menciptakan ketergantungan data dalam pipelineDAG. Dalam contoh berikut, meneruskan DelayedReturn
output fungsi ke preprocess
train
fungsi menciptakan ketergantungan antara preprocess
dantrain
.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
Contoh sebelumnya mendefinisikan fungsi pelatihan yang dihiasi dengan@step
. Ketika fungsi ini dipanggil, ia menerima DelayedReturn
output dari langkah pipa preprocessing sebagai input. Memanggil fungsi pelatihan mengembalikan DelayedReturn
instance lain. Instance ini menyimpan informasi tentang semua langkah sebelumnya yang didefinisikan dalam fungsi itu (yaitu, preprocess
langkah dalam contoh ini) yang membentuk pipelineDAG.
Pada contoh sebelumnya, preprocess
fungsi mengembalikan nilai tunggal. Untuk jenis pengembalian yang lebih kompleks seperti daftar atau tupel, lihat. Batasan
Tentukan dependensi khusus
Pada contoh sebelumnya, train
fungsi menerima DelayedReturn
output preprocess
dan menciptakan ketergantungan. Jika Anda ingin mendefinisikan ketergantungan secara eksplisit tanpa melewatkan output langkah sebelumnya, gunakan add_depends_on
fungsi dengan langkah tersebut. Anda dapat menggunakan get_step()
fungsi untuk mengambil langkah yang mendasari dari DelayedReturn
instance-nya, dan kemudian memanggil add_depends_on
_on dengan ketergantungan sebagai input. Untuk melihat definisi get_step()
fungsi, lihat sagemaker.workflow.step_outputs.get_steppreprocess
dan train
menggunakan get_step()
danadd_depends_on()
.
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Meneruskan data ke dan dari fungsi @step
yang didekorasi ke langkah pipa tradisional
Anda dapat membuat pipa yang mencakup langkah yang @step
didekorasi dan langkah pipa tradisional dan meneruskan data di antara mereka. Misalnya, Anda dapat menggunakan ProcessingStep
untuk memproses data dan meneruskan hasilnya ke fungsi pelatihan @step
yang didekorasi. Dalam contoh berikut, langkah pelatihan yang @step
didekorasi mereferensikan output dari langkah pemrosesan.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
Gunakan ConditionStep
dengan langkah-langkah @step
yang didekorasi
Pipelines mendukung ConditionStep
kelas yang mengevaluasi hasil dari langkah-langkah sebelumnya untuk memutuskan tindakan apa yang harus diambil dalam pipeline. Anda dapat menggunakan ConditionStep
dengan langkah @step
yang didekorasi juga. Untuk menggunakan output dari setiap langkah @step
yang didekorasi denganConditionStep
, masukkan output dari langkah itu sebagai argumen untukConditionStep
. Dalam contoh berikut, langkah kondisi menerima output dari langkah evaluasi model @step
yang didekorasi.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Tentukan pipeline menggunakan DelayedReturn
output dari langkah-langkah
Anda mendefinisikan pipeline dengan cara yang sama apakah Anda menggunakan @step
dekorator atau tidak. Ketika Anda meneruskan DelayedReturn
instance ke pipeline Anda, Anda tidak perlu melewati daftar lengkap langkah-langkah untuk membangun pipeline. SDKSecara otomatis menyimpulkan langkah-langkah sebelumnya berdasarkan dependensi yang Anda tentukan. Semua langkah sebelumnya dari Step
objek yang Anda lewatkan ke pipa atau DelayedReturn
objek termasuk dalam grafik pipa. Dalam contoh berikut, pipa menerima DelayedReturn
objek untuk train
fungsi tersebut. SageMaker menambahkan preprocess
langkah, sebagai langkah sebelumnyatrain
, ke grafik pipeline.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
Jika tidak ada data atau dependensi khusus di antara langkah-langkah dan Anda menjalankan beberapa langkah secara paralel, grafik pipeline memiliki lebih dari satu simpul daun. Berikan semua node daun ini dalam daftar ke steps
argumen dalam definisi pipeline Anda, seperti yang ditunjukkan pada contoh berikut:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
Saat pipeline berjalan, kedua langkah berjalan secara paralel.
Anda hanya meneruskan simpul daun grafik ke pipeline karena simpul daun berisi informasi tentang semua langkah sebelumnya yang ditentukan melalui data atau dependensi khusus. Ketika mengkompilasi pipa, SageMaker juga menyimpulkan semua langkah selanjutnya yang membentuk grafik pipa dan menambahkan masing-masing sebagai langkah terpisah ke pipa.
Buat pipeline
Buat pipeline dengan memanggilpipeline.create()
, seperti yang ditunjukkan pada cuplikan berikut. Untuk detailnyacreate()
, lihat SageMaker.Workflow.Pipeline.Pipeline.Create
role = "
pipeline-role
" pipeline.create(role)
Saat Anda memanggilpipeline.create()
, SageMaker mengkompilasi semua langkah yang didefinisikan sebagai bagian dari instance pipeline. SageMaker mengunggah fungsi serial, argumen, dan semua artefak terkait langkah lainnya ke Amazon S3.
Data berada di bucket S3 sesuai dengan struktur berikut:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
didefinisikan dalam file SageMaker konfigurasi dan berlaku untuk seluruh pipeline. Jika tidak terdefinisi, SageMaker bucket default digunakan.
catatan
Setiap kali SageMaker mengkompilasi pipeline, SageMaker menyimpan fungsi, argumen, dan dependensi serial langkah-langkah dalam folder yang diberi cap waktu dengan waktu saat ini. Ini terjadi setiap kali Anda berlaripipeline.create()
,pipeline.update()
, pipeline.upsert()
ataupipeline.definition()
.