Buat pipa dengan fungsi yang @step didekorasi - Amazon SageMaker

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Buat pipa dengan fungsi yang @step didekorasi

Anda dapat membuat pipeline dengan mengubah fungsi Python menjadi langkah-langkah pipeline menggunakan dekorator, membuat dependensi @step di antara fungsi-fungsi tersebut untuk membuat grafik pipeline (atau grafik asiklik terarah DAG ()), dan meneruskan simpul daun grafik tersebut sebagai daftar langkah ke pipeline. Bagian berikut menjelaskan prosedur ini secara rinci dengan contoh.

Ubah fungsi menjadi langkah

Untuk membuat langkah menggunakan @step dekorator, beri anotasi fungsi dengan. @step Contoh berikut menunjukkan fungsi @step -decorated yang memproses data sebelumnya.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

Saat Anda memanggil fungsi @step -decorated, SageMaker mengembalikan DelayedReturn instance alih-alih menjalankan fungsi. Sebuah DelayedReturn instance adalah proxy untuk pengembalian aktual dari fungsi itu. DelayedReturnInstance dapat diteruskan ke fungsi lain sebagai argumen atau langsung ke instance pipeline sebagai langkah. Untuk informasi tentang DelayedReturn kelas, lihat sagemaker.workflow.function_step. DelayedReturn.

Saat Anda membuat dependensi di antara dua langkah, Anda membuat koneksi antara langkah-langkah dalam grafik pipeline Anda. Bagian berikut memperkenalkan beberapa cara Anda dapat membuat dependensi di antara langkah-langkah pipeline Anda.

Melewati DelayedReturn output dari satu fungsi sebagai input ke fungsi lain secara otomatis menciptakan ketergantungan data dalam pipelineDAG. Dalam contoh berikut, meneruskan DelayedReturn output fungsi ke preprocess train fungsi menciptakan ketergantungan antara preprocess dantrain.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

Contoh sebelumnya mendefinisikan fungsi pelatihan yang dihiasi dengan@step. Ketika fungsi ini dipanggil, ia menerima DelayedReturn output dari langkah pipa preprocessing sebagai input. Memanggil fungsi pelatihan mengembalikan DelayedReturn instance lain. Instance ini menyimpan informasi tentang semua langkah sebelumnya yang didefinisikan dalam fungsi itu (yaitu, preprocess langkah dalam contoh ini) yang membentuk pipelineDAG.

Pada contoh sebelumnya, preprocess fungsi mengembalikan nilai tunggal. Untuk jenis pengembalian yang lebih kompleks seperti daftar atau tupel, lihat. Batasan

Pada contoh sebelumnya, train fungsi menerima DelayedReturn output preprocess dan menciptakan ketergantungan. Jika Anda ingin mendefinisikan ketergantungan secara eksplisit tanpa melewatkan output langkah sebelumnya, gunakan add_depends_on fungsi dengan langkah tersebut. Anda dapat menggunakan get_step() fungsi untuk mengambil langkah yang mendasari dari DelayedReturn instance-nya, dan kemudian memanggil add_depends_on _on dengan ketergantungan sebagai input. Untuk melihat definisi get_step() fungsi, lihat sagemaker.workflow.step_outputs.get_step. Contoh berikut menunjukkan cara membuat ketergantungan antara preprocess dan train menggunakan get_step() danadd_depends_on().

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

Anda dapat membuat pipa yang mencakup langkah yang @step didekorasi dan langkah pipa tradisional dan meneruskan data di antara mereka. Misalnya, Anda dapat menggunakan ProcessingStep untuk memproses data dan meneruskan hasilnya ke fungsi pelatihan @step yang didekorasi. Dalam contoh berikut, langkah pelatihan yang @step didekorasi mereferensikan output dari langkah pemrosesan.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

Gunakan ConditionStep dengan langkah-langkah @step yang didekorasi

Pipelines mendukung ConditionStep kelas yang mengevaluasi hasil dari langkah-langkah sebelumnya untuk memutuskan tindakan apa yang harus diambil dalam pipeline. Anda dapat menggunakan ConditionStep dengan langkah @step yang didekorasi juga. Untuk menggunakan output dari setiap langkah @step yang didekorasi denganConditionStep, masukkan output dari langkah itu sebagai argumen untukConditionStep. Dalam contoh berikut, langkah kondisi menerima output dari langkah evaluasi model @step yang didekorasi.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

Tentukan pipeline menggunakan DelayedReturn output dari langkah-langkah

Anda mendefinisikan pipeline dengan cara yang sama apakah Anda menggunakan @step dekorator atau tidak. Ketika Anda meneruskan DelayedReturn instance ke pipeline Anda, Anda tidak perlu melewati daftar lengkap langkah-langkah untuk membangun pipeline. SDKSecara otomatis menyimpulkan langkah-langkah sebelumnya berdasarkan dependensi yang Anda tentukan. Semua langkah sebelumnya dari Step objek yang Anda lewatkan ke pipa atau DelayedReturn objek termasuk dalam grafik pipa. Dalam contoh berikut, pipa menerima DelayedReturn objek untuk train fungsi tersebut. SageMaker menambahkan preprocess langkah, sebagai langkah sebelumnyatrain, ke grafik pipeline.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

Jika tidak ada data atau dependensi khusus di antara langkah-langkah dan Anda menjalankan beberapa langkah secara paralel, grafik pipeline memiliki lebih dari satu simpul daun. Berikan semua node daun ini dalam daftar ke steps argumen dalam definisi pipeline Anda, seperti yang ditunjukkan pada contoh berikut:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

Saat pipeline berjalan, kedua langkah berjalan secara paralel.

Anda hanya meneruskan simpul daun grafik ke pipeline karena simpul daun berisi informasi tentang semua langkah sebelumnya yang ditentukan melalui data atau dependensi khusus. Ketika mengkompilasi pipa, SageMaker juga menyimpulkan semua langkah selanjutnya yang membentuk grafik pipa dan menambahkan masing-masing sebagai langkah terpisah ke pipa.

Buat pipeline

Buat pipeline dengan memanggilpipeline.create(), seperti yang ditunjukkan pada cuplikan berikut. Untuk detailnyacreate(), lihat SageMaker.Workflow.Pipeline.Pipeline.Create.

role = "pipeline-role" pipeline.create(role)

Saat Anda memanggilpipeline.create(), SageMaker mengkompilasi semua langkah yang didefinisikan sebagai bagian dari instance pipeline. SageMaker mengunggah fungsi serial, argumen, dan semua artefak terkait langkah lainnya ke Amazon S3.

Data berada di bucket S3 sesuai dengan struktur berikut:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_urididefinisikan dalam file SageMaker konfigurasi dan berlaku untuk seluruh pipeline. Jika tidak terdefinisi, SageMaker bucket default digunakan.

catatan

Setiap kali SageMaker mengkompilasi pipeline, SageMaker menyimpan fungsi, argumen, dan dependensi serial langkah-langkah dalam folder yang diberi cap waktu dengan waktu saat ini. Ini terjadi setiap kali Anda berlaripipeline.create(),pipeline.update(), pipeline.upsert() ataupipeline.definition().