Crea una pipeline con funzioni -decorate @step - Amazon SageMaker AI

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Crea una pipeline con funzioni -decorate @step

È possibile creare una pipeline convertendo le funzioni Python in passaggi di pipeline utilizzando il @step decoratore, creando dipendenze tra tali funzioni per creare un grafico di pipeline (o un grafo aciclico diretto (DAG)) e passando i nodi foglia di quel grafico come elenco di passaggi alla pipeline. Le sezioni seguenti spiegano questa procedura in dettaglio con esempi.

Convertire una funzione in un passaggio

Per creare un passaggio utilizzando il @step decoratore, annota la funzione con. @step L'esempio seguente mostra una funzione @step -decorated che preelabora i dati.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

Quando richiami una funzione @step -decorated, SageMaker AI restituisce un'DelayedReturnistanza invece di eseguire la funzione. Un'DelayedReturnistanza è un proxy per l'effettivo ritorno di quella funzione. L'DelayedReturnistanza può essere passata a un'altra funzione come argomento o direttamente a un'istanza di pipeline come passaggio. Per informazioni sulla DelayedReturn classe, vedete sagemaker.workflow.function_step. DelayedReturn.

Quando crei una dipendenza tra due fasi, crei una connessione tra le fasi nel grafico della pipeline. Le sezioni seguenti introducono diversi modi per creare una dipendenza tra le fasi della pipeline.

Il passaggio dell'DelayedReturnoutput di una funzione come input a un'altra funzione crea automaticamente una dipendenza dai dati nella pipeline. DAG Nell'esempio seguente, il passaggio dell'DelayedReturnoutput della preprocess funzione alla train funzione crea una dipendenza tra e. preprocess train

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

L'esempio precedente definisce una funzione di allenamento decorata con@step. Quando questa funzione viene richiamata, riceve come input l'DelayedReturnoutput della fase della pipeline di preelaborazione. L'invocazione della funzione di training restituisce un'altra istanza. DelayedReturn Questa istanza contiene le informazioni su tutti i passaggi precedenti definiti in quella funzione (ad esempio, il preprocess passaggio in questo esempio) che formano la pipeline. DAG

Nell'esempio precedente, la preprocess funzione restituisce un singolo valore. Per tipi di restituzione più complessi come elenchi o tuple, consulta. Limitazioni

Nell'esempio precedente, la train funzione ha ricevuto l'DelayedReturnoutput di preprocess e ha creato una dipendenza. Se desideri definire la dipendenza in modo esplicito senza passare l'output del passaggio precedente, usa la add_depends_on funzione con il passaggio. È possibile utilizzare la get_step() funzione per recuperare il passaggio sottostante dalla relativa DelayedReturn istanza e quindi chiamare add_depends_on _on con la dipendenza come input. Per visualizzare la definizione della get_step() funzione, vedete sagemaker.workflow.step_outputs.get_step. L'esempio seguente mostra come creare una dipendenza tra e utilizzando and. preprocess train get_step() add_depends_on()

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

È possibile creare una pipeline che include una fase @step decorata e una fase di pipeline tradizionale e trasferisce i dati tra di loro. Ad esempio, è possibile utilizzare per ProcessingStep elaborare i dati e passarne il risultato alla funzione di allenamento @step -decorated. Nell'esempio seguente, una fase di addestramento @step -decorata fa riferimento all'output di una fase di elaborazione.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

Da utilizzare ConditionStep con gradini @step decorati

Pipelines supporta una ConditionStep classe che valuta i risultati dei passaggi precedenti per decidere quali azioni intraprendere nella pipeline. Puoi usarlo anche ConditionStep con un gradino @step decorato. Per utilizzare l'output di qualsiasi passo @step -decorato conConditionStep, inserisci l'output di quel passaggio come argomento per. ConditionStep Nell'esempio seguente, la fase di condizione riceve l'output della fase di valutazione del modello @step -decorated.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

Definite una pipeline utilizzando l'DelayedReturnoutput dei passaggi

Definisci una pipeline allo stesso modo indipendentemente dal fatto che utilizzi o meno un @step decoratore. Quando si passa un'DelayedReturnistanza alla pipeline, non è necessario passare un elenco completo di passaggi per creare la pipeline. SDKDeduce automaticamente i passaggi precedenti in base alle dipendenze definite. Tutti i passaggi precedenti degli Step oggetti passati alla pipeline o agli DelayedReturn oggetti sono inclusi nel grafico della tubazione. Nell'esempio seguente, la pipeline riceve l'DelayedReturnoggetto per la funzione. train SageMaker L'IA aggiunge il preprocess passaggio, come passaggio precedente ditrain, al grafico della pipeline.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

Se non ci sono dati o dipendenze personalizzate tra i passaggi e si eseguono più passaggi in parallelo, il grafico della pipeline ha più di un nodo foglia. Passa tutti questi nodi foglia in un elenco all'stepsargomento nella definizione della pipeline, come mostrato nell'esempio seguente:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

Quando la pipeline è in esecuzione, entrambi i passaggi vengono eseguiti in parallelo.

Passate solo i nodi foglia del grafico alla pipeline perché i nodi foglia contengono informazioni su tutti i passaggi precedenti definiti tramite dati o dipendenze personalizzate. Quando compila la pipeline, l' SageMaker intelligenza artificiale deduce anche tutti i passaggi successivi che formano il grafico della pipeline e aggiunge ciascuno di essi come passaggio separato alla pipeline.

Crea una pipeline

Crea una pipeline chiamandopipeline.create(), come mostrato nel frammento seguente. Per ulteriori informazioni, vedete create() SageMaker.Workflow.pipeline.pipeline.create.

role = "pipeline-role" pipeline.create(role)

Quando chiami, AI compila tutti i passaggi definiti come parte dell'pipeline.create()istanza della SageMaker pipeline. SageMaker L'intelligenza artificiale carica la funzione serializzata, gli argomenti e tutti gli altri artefatti relativi ai passaggi su Amazon S3.

I dati risiedono nel bucket S3 secondo la seguente struttura:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_uriè definito nel file di configurazione SageMaker AI e si applica all'intera pipeline. Se non definito, viene utilizzato il bucket SageMaker AI predefinito.

Nota

Ogni volta che l' SageMaker intelligenza artificiale compila una pipeline, l' SageMaker intelligenza artificiale salva le funzioni, gli argomenti e le dipendenze serializzati dei passaggi in una cartella con l'ora corrente. Ciò si verifica ogni volta che si esegue, o. pipeline.create() pipeline.update() pipeline.upsert() pipeline.definition()