Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Crea una pipeline con funzioni -decorate @step
È possibile creare una pipeline convertendo le funzioni Python in passaggi di pipeline utilizzando il @step
decoratore, creando dipendenze tra tali funzioni per creare un grafico di pipeline (o un grafo aciclico diretto (DAG)) e passando i nodi foglia di quel grafico come elenco di passaggi alla pipeline. Le sezioni seguenti spiegano questa procedura in dettaglio con esempi.
Argomenti
Convertire una funzione in un passaggio
Per creare un passaggio utilizzando il @step
decoratore, annota la funzione con. @step
L'esempio seguente mostra una funzione @step
-decorated che preelabora i dati.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Quando richiami una funzione @step
-decorated, SageMaker AI restituisce un'DelayedReturn
istanza invece di eseguire la funzione. Un'DelayedReturn
istanza è un proxy per l'effettivo ritorno di quella funzione. L'DelayedReturn
istanza può essere passata a un'altra funzione come argomento o direttamente a un'istanza di pipeline come passaggio. Per informazioni sulla DelayedReturn
classe, vedete sagemaker.workflow.function_step. DelayedReturn
Crea dipendenze tra i passaggi
Quando crei una dipendenza tra due fasi, crei una connessione tra le fasi nel grafico della pipeline. Le sezioni seguenti introducono diversi modi per creare una dipendenza tra le fasi della pipeline.
Dipendenze dei dati tramite argomenti di input
Il passaggio dell'DelayedReturn
output di una funzione come input a un'altra funzione crea automaticamente una dipendenza dai dati nella pipeline. DAG Nell'esempio seguente, il passaggio dell'DelayedReturn
output della preprocess
funzione alla train
funzione crea una dipendenza tra e. preprocess
train
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
L'esempio precedente definisce una funzione di allenamento decorata con@step
. Quando questa funzione viene richiamata, riceve come input l'DelayedReturn
output della fase della pipeline di preelaborazione. L'invocazione della funzione di training restituisce un'altra istanza. DelayedReturn
Questa istanza contiene le informazioni su tutti i passaggi precedenti definiti in quella funzione (ad esempio, il preprocess
passaggio in questo esempio) che formano la pipeline. DAG
Nell'esempio precedente, la preprocess
funzione restituisce un singolo valore. Per tipi di restituzione più complessi come elenchi o tuple, consulta. Limitazioni
Definire dipendenze personalizzate
Nell'esempio precedente, la train
funzione ha ricevuto l'DelayedReturn
output di preprocess
e ha creato una dipendenza. Se desideri definire la dipendenza in modo esplicito senza passare l'output del passaggio precedente, usa la add_depends_on
funzione con il passaggio. È possibile utilizzare la get_step()
funzione per recuperare il passaggio sottostante dalla relativa DelayedReturn
istanza e quindi chiamare add_depends_on
_on con la dipendenza come input. Per visualizzare la definizione della get_step()
funzione, vedete sagemaker.workflow.step_outputs.get_step.preprocess
train
get_step()
add_depends_on()
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Passa i dati da e verso una funzione @step
decorata a una fase di pipeline tradizionale
È possibile creare una pipeline che include una fase @step
decorata e una fase di pipeline tradizionale e trasferisce i dati tra di loro. Ad esempio, è possibile utilizzare per ProcessingStep
elaborare i dati e passarne il risultato alla funzione di allenamento @step
-decorated. Nell'esempio seguente, una fase di addestramento @step
-decorata fa riferimento all'output di una fase di elaborazione.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
Da utilizzare ConditionStep
con gradini @step
decorati
Pipelines supporta una ConditionStep
classe che valuta i risultati dei passaggi precedenti per decidere quali azioni intraprendere nella pipeline. Puoi usarlo anche ConditionStep
con un gradino @step
decorato. Per utilizzare l'output di qualsiasi passo @step
-decorato conConditionStep
, inserisci l'output di quel passaggio come argomento per. ConditionStep
Nell'esempio seguente, la fase di condizione riceve l'output della fase di valutazione del modello @step
-decorated.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Definite una pipeline utilizzando l'DelayedReturn
output dei passaggi
Definisci una pipeline allo stesso modo indipendentemente dal fatto che utilizzi o meno un @step
decoratore. Quando si passa un'DelayedReturn
istanza alla pipeline, non è necessario passare un elenco completo di passaggi per creare la pipeline. SDKDeduce automaticamente i passaggi precedenti in base alle dipendenze definite. Tutti i passaggi precedenti degli Step
oggetti passati alla pipeline o agli DelayedReturn
oggetti sono inclusi nel grafico della tubazione. Nell'esempio seguente, la pipeline riceve l'DelayedReturn
oggetto per la funzione. train
SageMaker L'IA aggiunge il preprocess
passaggio, come passaggio precedente ditrain
, al grafico della pipeline.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
Se non ci sono dati o dipendenze personalizzate tra i passaggi e si eseguono più passaggi in parallelo, il grafico della pipeline ha più di un nodo foglia. Passa tutti questi nodi foglia in un elenco all'steps
argomento nella definizione della pipeline, come mostrato nell'esempio seguente:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
Quando la pipeline è in esecuzione, entrambi i passaggi vengono eseguiti in parallelo.
Passate solo i nodi foglia del grafico alla pipeline perché i nodi foglia contengono informazioni su tutti i passaggi precedenti definiti tramite dati o dipendenze personalizzate. Quando compila la pipeline, l' SageMaker intelligenza artificiale deduce anche tutti i passaggi successivi che formano il grafico della pipeline e aggiunge ciascuno di essi come passaggio separato alla pipeline.
Crea una pipeline
Crea una pipeline chiamandopipeline.create()
, come mostrato nel frammento seguente. Per ulteriori informazioni, vedete create()
SageMaker.Workflow.pipeline.pipeline.create.
role = "
pipeline-role
" pipeline.create(role)
Quando chiami, AI compila tutti i passaggi definiti come parte dell'pipeline.create()
istanza della SageMaker pipeline. SageMaker L'intelligenza artificiale carica la funzione serializzata, gli argomenti e tutti gli altri artefatti relativi ai passaggi su Amazon S3.
I dati risiedono nel bucket S3 secondo la seguente struttura:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
è definito nel file di configurazione SageMaker AI e si applica all'intera pipeline. Se non definito, viene utilizzato il bucket SageMaker AI predefinito.
Nota
Ogni volta che l' SageMaker intelligenza artificiale compila una pipeline, l' SageMaker intelligenza artificiale salva le funzioni, gli argomenti e le dipendenze serializzati dei passaggi in una cartella con l'ora corrente. Ciò si verifica ogni volta che si esegue, o. pipeline.create()
pipeline.update()
pipeline.upsert()
pipeline.definition()