Fasi di Amazon SageMaker Model Building Pipelines - Amazon SageMaker

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Fasi di Amazon SageMaker Model Building Pipelines

SageMaker Le pipeline sono composte da passaggi. Queste fasi definiscono le operazioni intraprese dalla pipeline e le relazioni tra le fasi utilizzando le proprietà.

Tipo di fase

Di seguito vengono descritti i requisiti di ogni tipo di fase e viene fornito un esempio di implementazione della fase. Queste non sono implementazioni funzionanti perché non forniscono le risorse e gli input necessari. Per un tutorial che implementa queste fasi, consulta Creare e gestire le pipeline SageMaker .

Nota

Puoi anche creare un passaggio dal tuo codice di machine learning locale convertendolo in un passaggio SageMaker Pipelines con il decoratore. @step Per ulteriori informazioni, consulta decoratore @step.

Amazon SageMaker Model Building Pipelines supporta i seguenti tipi di passaggi:

decoratore @step

Puoi creare un passaggio dal codice di machine learning locale usando il @step decoratore. Dopo aver testato il codice, puoi convertire la funzione in una fase della SageMaker pipeline annotandola con il decoratore. @step SageMaker Pipelines crea ed esegue una pipeline quando passate l'output della funzione @step -decorated come passaggio alla pipeline. È inoltre possibile creare una DAG pipeline in più fasi che includa una o più funzioni @step decorate oltre ai passaggi della pipeline tradizionali. SageMaker Per ulteriori dettagli su come creare una fase con @step decorator, consulta. Codice L ift-and-shift Python con il decoratore @step

Fase di elaborazione

Utilizza una fase di elaborazione per creare un processo di elaborazione per l'elaborazione dei dati. Per ulteriori informazioni sull'elaborazione dei processi, consulta Elaborazione di dati e valutazione di modelli.

Una fase di elaborazione richiede un processore, uno script Python che definisce il codice di elaborazione, gli output per l'elaborazione e gli argomenti del processo. L'esempio seguente mostra come creare una definizione di ProcessingStep.

from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )

Passaggio di parametri di runtime

L'esempio seguente mostra come passare i parametri di runtime da un PySpark processore a unProcessingStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )

Per ulteriori informazioni sui requisiti delle fasi di elaborazione, consulta sagemaker.workflow.steps. ProcessingStepdocumentazione. Per un esempio approfondito, consulta il notebook di esempio Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines. La sezione Define a Processing Step for Feature Engineering include ulteriori informazioni.

Fase di addestramento

Si utilizza una fase di addestramento per creare un processo di addestramento per addestrare un modello. Per ulteriori informazioni sui lavori di formazione, consulta Train a Model with Amazon SageMaker.

Una fase di addestramento richiede uno strumento di valutazione e input di dati di addestramento e convalida. L'esempio seguente mostra come creare una definizione di TrainingStep. Per ulteriori informazioni sui requisiti delle fasi di formazione, consulta sagemaker.workflow.steps. TrainingStepdocumentazione.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )

Fase di ottimizzazione

Si utilizza una fase di ottimizzazione per creare un processo di ottimizzazione degli iperparametri, noto anche come ottimizzazione degli iperparametri (). HPO Un processo di ottimizzazione iperparametrica esegue più lavori di formazione, ciascuno dei quali produce una versione del modello. Per ulteriori informazioni sull'ottimizzazione degli iperparametri, consulta Esegui l'ottimizzazione automatica del modello con SageMaker.

Il lavoro di ottimizzazione è associato all' SageMaker esperimento per la pipeline, mentre i lavori di formazione vengono creati come prove. Per ulteriori informazioni, consulta Integrazione di Esperimenti.

Una fase di ottimizzazione richiede un input di formazione HyperparameterTuner. È possibile addestrare nuovamente processi di ottimizzazione precedenti specificando il parametro warm_start_config del HyperparameterTuner. Per ulteriori informazioni sull'ottimizzazione degli iperparametri e sull'avvio a caldo, consulta Eseguire un processo di ottimizzazione degli iperparametri con avvio a caldo.

Utilizzate il metodo get_top_model_s3_uri di sagemaker.workflow.steps. TuningStepclasse per ottenere l'artefatto del modello da una delle versioni del modello con le migliori prestazioni. Per un taccuino che mostra come utilizzare una fase di ottimizzazione in una SageMaker pipeline, vedete .ipynb. sagemaker-pipelines-tuning-step

Importante

Le fasi di ottimizzazione sono state introdotte in Amazon SageMaker SDK Python v2.48.0 e SageMaker Amazon Studio Classic v3.8.0. È necessario aggiornare Studio Classic prima di utilizzare una fase di ottimizzazione o la pipeline non viene visualizzata. DAG Per aggiornare Studio Classic, vedi. Chiudi e aggiorna SageMaker Studio Classic

L'esempio seguente mostra come creare una definizione di TuningStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://my-bucket/my-data")) )

Ottenere la versione migliore del modello

Nell'esempio seguente viene illustrato come ottenere la versione migliore del modello dal processo di ottimizzazione mediante il metodo get_top_model_s3_uri. Al massimo, sono disponibili le 50 versioni con le migliori prestazioni, classificate in base a HyperParameterTuningJobObjective. L'argomento top_k è un indice delle versioni, dove top_k=0 indica la versione con le migliori prestazioni e top_k=49 quella con le prestazioni peggiori.

best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )

Per ulteriori informazioni sui requisiti delle fasi di ottimizzazione, consulta sagemaker.workflow.steps. TuningStepdocumentazione.

Fase AutoML

Utilizzate AutoML API per creare un lavoro AutoML per addestrare automaticamente un modello. Per ulteriori informazioni sui job AutoML, consulta Automatizza lo sviluppo di modelli con Amazon Autopilot. SageMaker

Nota

Attualmente, la fase AutoML supporta solo la modalità di addestramento di raggruppamento.

L'esempio seguente mostra come creare una definizione utilizzando AutoMLStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="<role>", target_attribute_name="my_target_attribute_name", mode="ENSEMBLING", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://my-bucket/my-training-data", target_attribute_name="my_target_attribute_name", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://my-bucket/my-validation-data", target_attribute_name="my_target_attribute_name", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )

Ottenere la versione migliore del modello

La fase AutoML addestra automaticamente diversi candidati modello. Ottenete il modello con la migliore metrica oggettiva dal job AutoML utilizzando get_best_auto_ml_model il metodo seguente. È inoltre necessario utilizzare an IAM role per accedere agli artefatti del modello.

best_model = step_automl.get_best_auto_ml_model(role=<role>)

Per ulteriori informazioni, vedete il passaggio AutoML in Python SageMaker . SDK

Fase del modello

Utilizzate ModelStep a per creare o registrare un SageMaker modello. Per ulteriori informazioni sui ModelStep requisiti, consultate sagemaker.workflow.model_step. ModelStepdocumentazione.

Creazione di un modello

È possibile utilizzare a ModelStep per creare un SageMaker modello. A ModelStep richiede elementi del modello e informazioni sul tipo di SageMaker istanza da utilizzare per creare il modello. Per ulteriori informazioni sui SageMaker modelli, consulta Train a Model with Amazon SageMaker.

L'esempio seguente mostra come creare una definizione di ModelStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )

Registrazione di un modello

Puoi usare ModelStep a per registrare un sagemaker.model.Model o un sagemaker.pipeline.PipelineModel nel registro dei SageMaker modelli Amazon. Un PipelineModel rappresenta una pipeline di inferenza, che è un modello composto da una sequenza lineare di container che elaborano richieste di inferenza. Per ulteriori informazioni su come eseguire la registrazione di un modello, consulta Registrazione e implementazione di modelli con il Registro dei modelli.

L'esempio seguente mostra come creare una ModelStep per registrare un PipelineModel.

import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )

CreateModel passo

Importante

Si consiglia di utilizzare Fase del modello per creare modelli a partire dalla v2.90.0 di Python. SageMaker SDK CreateModelStepcontinuerà a funzionare nelle versioni precedenti di SageMaker PythonSDK, ma non è più supportato attivamente.

Si utilizza un CreateModel passaggio per creare un SageMaker modello. Per ulteriori informazioni sui SageMaker modelli, consulta Train a Model with Amazon SageMaker.

Una fase di creazione del modello richiede artefatti del modello e informazioni sul tipo di SageMaker istanza da utilizzare per creare il modello. L'esempio seguente mostra come creare una definizione di fase CreateModel. Per ulteriori informazioni sui requisiti dei CreateModel passaggi, consultate sagemaker.workflow.steps. CreateModelStepdocumentazione.

from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )

RegisterModel passo

Importante

Si consiglia di utilizzare Fase del modello per registrare i modelli a partire dalla v2.90.0 di Python. SageMaker SDK RegisterModelcontinuerà a funzionare nelle versioni precedenti di SageMaker PythonSDK, ma non è più supportato attivamente.

Si utilizza un RegisterModel passaggio per registrare un SageMaker.model.model o un sagemaker.pipeline. PipelineModelcon il registro dei SageMaker modelli Amazon. Un PipelineModel rappresenta una pipeline di inferenza, che è un modello composto da una sequenza lineare di container che elaborano richieste di inferenza.

Per ulteriori informazioni su come eseguire la registrazione di un modello, consulta Registrazione e implementazione di modelli con il Registro dei modelli. Per ulteriori informazioni sui requisiti dei RegisterModel passaggi, consulta sagemaker.workflow.step_collections. RegisterModeldocumentazione.

L'esempio seguente mostra come creare una fase RegisterModel per registrare un PipelineModel.

import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', )

Se model non viene fornito, la fase di registrazione del modello richiede uno strumento di valutazione come mostrato nell'esempio seguente.

from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

Fase di trasformazione

Si utilizza una fase di trasformazione in modo che la trasformazione in batch esegua inferenza su un intero set di dati. Per ulteriori informazioni sulla trasformazione in batch, consulta Esecuzione di trasformazioni in batch con pipeline di inferenza.

Una fase di trasformazione richiede un trasformatore e i dati su cui eseguire la trasformazione in batch. L'esempio seguente mostra come creare una definizione di fase Transform. Per ulteriori informazioni sui requisiti delle Transform fasi, consulta sagemaker.workflow.steps. TransformStepdocumentazione.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://my-bucket/my-data"), )

Fase di condizione

Si utilizza una fase condizionale per valutare la condizione delle proprietà della fase per valutare quale azione deve essere intrapresa successivamente nella pipeline.

Una fase di condizione richiede:

  • Un elenco di condizioni.

  • Un elenco di passaggi da eseguire se la condizione restituisce. true

  • Un elenco di passaggi da eseguire se la condizione restituisce. false

L'esempio seguente mostra come creare una definizione di ConditionStep.

Limitazioni

  • SageMaker Pipelines non supporta l'uso di fasi di condizione annidate. Non è possibile passare una fase condizionale come input per un'altra fase condizionale.

  • Una fase condizionale non può utilizzare fasi identiche in entrambi i rami. Se hai bisogno della stessa funzionalità di fase in entrambi i rami, duplica la fase e assegnale un nome diverso.

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )

Per ulteriori informazioni sui ConditionStep requisiti, consulta sagemaker.workflow.condition_step. ConditionStepAPIriferimento. Per ulteriori informazioni sulle condizioni supportate, consulta Amazon SageMaker Model Building Pipelines - Conditions nella documentazione di SageMaker SDK Python.

Fase di callback

Utilizza un Callback passaggio per aggiungere processi e AWS servizi aggiuntivi al tuo flusso di lavoro che non sono forniti direttamente da Amazon SageMaker Model Building Pipelines. Quando viene eseguita una fase di Callback, si verifica la seguente procedura:

  • SageMaker Pipelines invia un messaggio a una coda Amazon Simple Queue Service (AmazonSQS) specificata dal cliente. Il messaggio contiene un token SageMaker generato da Pipelines e un elenco di parametri di input fornito dal cliente. Dopo aver inviato il messaggio, SageMaker Pipelines attende una risposta dal cliente.

  • Il cliente recupera il messaggio dalla SQS coda di Amazon e avvia la procedura personalizzata.

  • Al termine del processo, il cliente chiama una delle seguenti persone APIs e invia il token generato da Pipelines: SageMaker

  • La API chiamata fa sì che SageMaker Pipelines continui il processo della pipeline o fallisca il processo.

Per ulteriori informazioni sui requisiti delle fasi, consulta Callback sagemaker.workflow.callback_step. CallbackStepdocumentazione. Per una soluzione completa, consulta Extend SageMaker Pipelines per includere passaggi personalizzati utilizzando passaggi di callback.

Importante

Callbacki passaggi sono stati introdotti in Amazon SageMaker Python SDK v2.45.0 e Amazon SageMaker Studio Classic v3.6.2. È necessario aggiornare Studio Classic prima di utilizzare un Callback passaggio o la pipeline non viene visualizzata. DAG Per aggiornare Studio Classic, vediChiudi e aggiorna SageMaker Studio Classic.

L'esempio seguente mostra un'implementazione della procedura precedente.

from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
Nota

I parametri di output per CallbackStep non devono essere nidificati. Ad esempio, se si utilizza un dizionario nidificato come parametro di output, il dizionario viene trattato come una singola stringa (ad es. {"output1": "{\"nested_output1\":\"my-output\"}"}). Se si fornisce un valore annidato, quando si tenta di fare riferimento a un particolare parametro di output, viene SageMaker generato un errore irreversibile del client.

Comportamento di arresto

Un processo di pipeline non si arresta durante l'esecuzione di una fase di Callback.

Quando StopPipelineExecutionrichiami un processo di pipeline con una Callback fase in esecuzione, SageMaker Pipelines invia un SQS messaggio Amazon alla SQS coda. Il corpo del SQS messaggio contiene un campo Status, impostato su. Stopping Di seguito viene illustrato un esempio di corpo del SQS messaggio.

{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }

Devi aggiungere una logica al tuo Amazon SQS Message Consumer per intraprendere tutte le azioni necessarie (ad esempio, la pulizia delle risorse) alla ricezione del messaggio. Quindi aggiungi una chiamata a SendPipelineExecutionStepSuccess oSendPipelineExecutionStepFailure.

Solo quando SageMaker Pipelines riceve una di queste chiamate interrompe il processo della pipeline.

Gradino Lambda

Si utilizza un passaggio Lambda per eseguire una AWS Lambda funzione. È possibile eseguire una funzione Lambda esistente oppure SageMaker creare ed eseguire una nuova funzione Lambda. Per un taccuino che mostra come usare una fase Lambda in una SageMaker pipeline, consulta .ipynb. sagemaker-pipelines-lambda-step

Importante

Le fasi Lambda sono state introdotte in Amazon SageMaker Python v2.51.0 e Amazon Studio Classic v3.9.1SDK. SageMaker È necessario aggiornare Studio Classic prima di utilizzare un passaggio Lambda o la pipeline DAG non viene visualizzata. Per aggiornare Studio Classic, vedi. Chiudi e aggiorna SageMaker Studio Classic

SageMaker fornisce la classe SageMaker.lambda_helper.lambda per creare, aggiornare, richiamare ed eliminare funzioni Lambda. Lambdaha la seguente firma.

Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )

Il sagemaker.workflow.lambda_step. LambdaStepla classe ha un argomento di tipo. lambda_func Lambda Per richiamare una funzione Lambda esistente, l'unico requisito è fornire l'Amazon Resource Name ARN () della funzione a. function_arn Se non fornisci un valore per function_arn, devi specificare handler e uno dei seguenti parametri:

  • zipped_code_dir: il percorso della funzione Lambda compressa

    s3_bucket: bucket Amazon S3 dove deve essere caricato zipped_code_dir

  • script: il percorso del file di script della funzione Lambda

L'esempio seguente mostra come creare una definizione di fase Lambda che richiami una funzione Lambda esistente.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

L'esempio seguente mostra come creare una definizione di fase Lambda che crei e richiami una funzione Lambda utilizzando uno script di funzione Lambda.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

Input e output

Se la funzione Lambda ha input o output, anche questi devono essere definiti nella fase Lambda.

Nota

I parametri di input e output non devono essere nidificati. Ad esempio, se si utilizza un dizionario nidificato come parametro di output, il dizionario viene trattato come una singola stringa (ad es. {"output1": "{\"nested_output1\":\"my-output\"}"}). Se si fornisce un valore nidificato e si tenta di farvi riferimento in un secondo momento, viene generato un errore irreversibile del client.

Quando si definisce la fase Lambda, inputs deve essere un dizionario di coppie chiave-valore. Ogni valore del dizionario inputs deve essere di tipo primitivo (stringa, numero intero o float). Gli oggetti nidificati non sono supportati. Se non viene definito, il valore inputs predefinito è None.

Il valore outputs deve essere un elenco di chiavi. Queste chiavi si riferiscono a un dizionario definito nell'output della funzione Lambda. Come per inputs, queste chiavi devono essere di tipo primitivo e gli oggetti nidificati non sono supportati.

Comportamento di timeout e arresto

La classe Lambda ha un argomento timeout che specifica il tempo massimo di esecuzione della funzione Lambda. Il valore predefinito è 120 secondi con un valore massimo di 10 minuti. Se la funzione Lambda è in esecuzione quando viene raggiunto il timeout, la fase Lambda ha esito negativo; tuttavia, la funzione Lambda continua a essere in esecuzione.

Un processo di pipeline non può essere arrestato mentre è in esecuzione una fase Lambda perché la funzione Lambda richiamata dalla fase Lambda non può essere arrestata. Se si interrompe il processo mentre la funzione Lambda è in esecuzione, la pipeline attende il completamento della funzione o il raggiungimento del timeout. Dipende dall'evento che si verifica per primo. Il processo si interrompe quindi. Se la funzione Lambda termina, lo stato del processo di pipeline è Stopped. Se viene raggiunto il timeout, lo stato del processo di pipeline è Failed.

ClarifyCheck passo

Si può utilizzare la fase ClarifyCheck per eseguire controlli della deviazione dalla baseline rispetto alle baseline precedenti per l'analisi dei bias e la spiegabilità del modello. È quindi possibile generare e registrare le proprie baseline con il metodo model.register() e passare l'output di tale metodo a Fase del modello utilizzando step_args. Queste linee di base per il controllo della deriva possono essere utilizzate da Amazon SageMaker Model Monitor per gli endpoint del modello. Di conseguenza, non è necessario fornire separatamente un suggerimento di base.

La fase ClarifyCheck può anche estrarre le baseline per il controllo della deviazione dal registro dei modelli. La ClarifyCheck fase utilizza il contenitore predefinito SageMaker Clarify. Questo contenitore offre una gamma di funzionalità di monitoraggio dei modelli, tra cui la proposta di vincoli e la convalida dei vincoli rispetto a una determinata linea di base. Per ulteriori informazioni, consulta Inizia con un contenitore Clarify SageMaker .

Configurazione della fase ClarifyCheck

È possibile configurare la fase ClarifyCheck per eseguire solo uno dei seguenti tipi di controllo ogni volta che viene utilizzata in una pipeline.

  • Controllo dei bias nei dati

  • Controllo dei bias nel modello

  • Controllo della spiegabilità del modello

A tale scopo, impostate il clarify_check_config parametro con uno dei seguenti valori del tipo di controllo:

  • DataBiasCheckConfig

  • ModelBiasCheckConfig

  • ModelExplainabilityCheckConfig

La ClarifyCheck fase avvia un processo di elaborazione che esegue il contenitore predefinito SageMaker Clarify e richiede configurazioni dedicate per il controllo e il processo di elaborazione. ClarifyCheckConfige CheckJobConfig sono funzioni di supporto per queste configurazioni. Queste funzioni di supporto sono in linea con il modo in cui viene calcolato il processo di elaborazione di SageMaker Clarify per verificare la distorsione del modello, la distorsione dei dati o la spiegabilità del modello. Per ulteriori informazioni, consulta Esegui SageMaker Clarify Processing Jobs per l'analisi e la spiegabilità dei pregiudizi.

Controllo dei comportamenti della fase per il controllo della deviazione

La fase ClarifyCheck richiede i seguenti due flag booleani per controllarne il comportamento:

  • skip_check: questo parametro indica se il controllo della deriva rispetto alla baseline precedente viene saltato o meno. Se è impostato su False, deve essere disponibile la baseline precedente del tipo di controllo configurato.

  • register_new_baseline: questo parametro indica se è possibile accedere a una baseline appena calcolata tramite la proprietà di fase BaselineUsedForDriftCheckConstraints. Se è impostato su False, deve essere disponibile anche la baseline precedente del tipo di controllo configurato. È possibile accedervi tramite la proprietà BaselineUsedForDriftCheckConstraints.

Per ulteriori informazioni, consulta Calcolo di base, rilevamento della deriva e ciclo di vita con e ClarifyCheck fasi QualityCheck in Amazon Model Building Pipelines SageMaker .

Utilizzo delle baseline

Facoltativamente, è possibile specificare per individuare la linea di base esistente. model_package_group_name Quindi, il ClarifyCheck passaggio richiama l'DriftCheckBaselinesultimo pacchetto modello approvato nel gruppo di pacchetti modello.

In alternativa, è possibile fornire una baseline precedente tramite il parametro supplied_baseline_constraints. Se si specificano sia model_package_group_name che supplied_baseline_constraints, la fase ClarifyCheck utilizza la baseline specificata dal parametro supplied_baseline_constraints.

Per ulteriori informazioni sull'utilizzo dei requisiti dei ClarifyCheck passaggi, consultate sagemaker.workflow.steps. ClarifyCheckStepin Amazon SageMaker SageMaker SDK per Python. Per un notebook Amazon SageMaker Studio Classic che mostra come usare ClarifyCheck step in SageMaker Pipelines, consulta sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Esempio Crea una fase ClarifyCheck per il controllo dei bias nei dati
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']), label=0, dataset_type="text/csv", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json", model_package_group_name="MyModelPackageGroup" )

QualityCheck passo

Utilizza questa QualityCheck procedura per eseguire suggerimenti di base e verificare la deviazione rispetto a una linea di base precedente per quanto riguarda la qualità dei dati o la qualità dei modelli in una pipeline. È quindi possibile generare e registrare le linee di base con il model.register() metodo e passare l'output di tale metodo all'utilizzo.] Fase del modello step_args

Model Monitor può usare queste baseline per il controllo della deviazione per gli endpoint del modello in modo da non dover eseguire un suggerimento di baseline separatamente. La fase QualityCheck può anche estrarre le baseline per il controllo della deviazione dal registro dei modelli. La QualityCheck fase sfrutta il contenitore precostruito Amazon SageMaker Model Monitor. Questo contenitore dispone di una gamma di funzionalità di monitoraggio dei modelli, tra cui suggerimento di vincoli, generazione di statistiche e convalida dei vincoli rispetto a una linea di base. Per ulteriori informazioni, consulta Contenitore precostruito Amazon SageMaker Model Monitor.

Configurazione della fase QualityCheck

È possibile configurare il QualityCheck passaggio per eseguire solo uno dei seguenti tipi di controllo ogni volta che viene utilizzato in una pipeline.

  • Controllo della qualità dei dati

  • Controllo della qualità del modello

È possibile eseguire questa operazione impostando il parametro quality_check_config con uno dei valori di tipo di controllo seguenti:

  • DataQualityCheckConfig

  • ModelQualityCheckConfig

La fase QualityCheck avvia un processo di elaborazione che esegue il container esistente di Model Monitor e richiede configurazioni dedicate per il controllo e il processo di elaborazione. Le QualityCheckConfig e CheckJobConfig sono funzioni di supporto per queste configurazioni. Queste funzioni di supporto sono in linea con il modo in cui Model Monitor crea una linea di base per la qualità del modello o il monitoraggio della qualità dei dati. Per ulteriori informazioni sui suggerimenti di baseline di Model Monitor, consulta Creazione di una linea di base e Crea una base di riferimento per la qualità del modello.

Controllo dei comportamenti della fase per il controllo della deviazione

La fase QualityCheck richiede i seguenti due flag booleani per controllarne il comportamento:

  • skip_check: questo parametro indica se il controllo della deriva rispetto alla baseline precedente viene saltato o meno. Se è impostato su False, deve essere disponibile la baseline precedente del tipo di controllo configurato.

  • register_new_baseline: questo parametro indica se è possibile accedere a una baseline appena calcolata tramite le proprietà di fase BaselineUsedForDriftCheckConstraints e BaselineUsedForDriftCheckStatistics. Se è impostato su False, deve essere disponibile anche la baseline precedente del tipo di controllo configurato. È possibile accedervi tramite le proprietà BaselineUsedForDriftCheckConstraints e BaselineUsedForDriftCheckStatistics.

Per ulteriori informazioni, consulta Calcolo di base, rilevamento della deriva e ciclo di vita con e ClarifyCheck fasi QualityCheck in Amazon Model Building Pipelines SageMaker .

Utilizzo delle baseline

È possibile specificare una linea di base precedente direttamente tramite i parametri and. supplied_baseline_statistics supplied_baseline_constraints È inoltre possibile specificare model_package_group_name e il QualityCheck passaggio richiama l'DriftCheckBaselinesultimo pacchetto di modelli approvato nel gruppo di pacchetti di modelli.

Quando specificate quanto segue, la QualityCheck fase utilizza la linea di base specificata da supplied_baseline_constraints e supplied_baseline_statistics sul tipo di controllo della QualityCheck fase.

  • model_package_group_name

  • supplied_baseline_constraints

  • supplied_baseline_statistics

Per ulteriori informazioni sull'utilizzo dei requisiti della QualityCheck fase, consultate sagemaker.workflow.steps. QualityCheckStepin Amazon SageMaker SageMaker SDK per Python. Per un notebook Amazon SageMaker Studio Classic che mostra come usare QualityCheck step in SageMaker Pipelines, consulta sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Esempio Crea una fase QualityCheck per il controllo della qualità dei dati
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json", model_package_group_name="MyModelPackageGroup" )

EMRpasso

Utilizza la EMRfase Amazon SageMaker Model Building Pipelines per:

Per ulteriori informazioni su AmazonEMR, consulta la sezione Guida introduttiva ad Amazon EMR.

Il EMR passaggio richiede che EMRStepConfig includa la posizione del JAR file utilizzato dal EMR cluster Amazon e gli eventuali argomenti da passare. Fornisci anche l'ID del EMR cluster Amazon se desideri eseguire la fase su un EMR cluster in esecuzione. Puoi anche passare la configurazione del cluster per eseguire il EMR passaggio su un cluster che viene creato, gestito e terminato per te. Le sezioni seguenti includono esempi e collegamenti a notebook di esempio che illustrano entrambi i metodi.

Nota
  • EMRi passaggi richiedono che il ruolo passato alla pipeline disponga di autorizzazioni aggiuntive. Allega la policy AWS gestita: AmazonSageMakerPipelinesIntegrations al tuo ruolo nella pipeline o assicurati che il ruolo includa le autorizzazioni contenute in quella policy.

  • EMRstep non è supportato su serverless. EMR Inoltre, non è supportato su Amazon EMR onEKS.

  • Se elabori un EMR passaggio su un cluster in esecuzione, puoi utilizzare solo un cluster che si trova in uno dei seguenti stati:

    • STARTING

    • BOOTSTRAPPING

    • RUNNING

    • WAITING

  • Se si elaborano EMR passaggi su un cluster in esecuzione, è possibile avere al massimo 256 EMR passaggi in uno PENDING stato su un EMR cluster. EMRi passaggi inviati oltre questo limite provocano un errore di esecuzione della pipeline. Puoi valutare l'utilizzo di Policy di ripetizione per la fasi della pipeline.

  • È possibile specificare l'ID o la configurazione del cluster, ma non entrambi.

  • La EMR fase si basa su Amazon EventBridge per monitorare le modifiche nella EMR fase o nello stato del cluster. Se elabori il tuo EMR job Amazon su un cluster in esecuzione, la EMR fase utilizza la SageMakerPipelineExecutionEMRStepStatusUpdateRule regola per monitorare lo stato della EMR fase. Se elabori il processo su un cluster creato dalla EMR fase, la fase utilizza la SageMakerPipelineExecutionEMRClusterStatusRule regola per monitorare i cambiamenti nello stato del cluster. Se vedi una di queste EventBridge regole nel tuo AWS account, non eliminarla, altrimenti il EMR passaggio potrebbe non essere completato.

Avvia un nuovo lavoro su un EMR cluster Amazon in esecuzione

Per avviare un nuovo lavoro su un EMR cluster Amazon in esecuzione, passa l'ID del cluster come stringa all'cluster_idargomento diEMRStep. L'esempio seguente illustra questa procedura.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )

Per un taccuino di esempio che ti guida attraverso un esempio completo, vedi SageMaker Pipelines EMR Step With Running EMR Cluster.

Avvia un nuovo lavoro su un nuovo EMR cluster Amazon

Per avviare un nuovo job su un nuovo cluster EMRStep creato per te, fornisci la configurazione del cluster sotto forma di dizionario. Il dizionario deve avere la stessa struttura di una RunJobFlowrichiesta. Tuttavia, non includere i seguenti campi nella configurazione del cluster:

  • [Name]

  • [Steps]

  • [AutoTerminationPolicy]

  • [Instances][KeepJobFlowAliveWhenNoSteps]

  • [Instances][TerminationProtected]

Tutti gli altri argomenti RunJobFlow sono disponibili per l'uso nella configurazione del cluster. Per i dettagli sulla sintassi della richiesta, vedere RunJobFlow.

L'esempio seguente passa una configurazione del cluster a una definizione di EMR fase. Questo richiede il passaggio per avviare un nuovo processo su un nuovo EMR cluster. La configurazione del EMR cluster in questo esempio include le specifiche per i nodi principali e principali EMR del cluster. Per ulteriori informazioni sui tipi di EMR nodi Amazon, consulta Comprendi i tipi di nodi: nodi primari, core e task.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role", "ServiceRole": "service-role" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )

Per un notebook di esempio che ti guida attraverso un esempio completo, consulta SageMaker Pipelines EMR Step With Cluster Lifecycle Management.

Fase di lavoro del notebook

Usa NotebookJobStep a per eseguire SageMaker Notebook Job in modo non interattivo come fase della pipeline. Per ulteriori informazioni su SageMaker Notebook Jobs, consulta. SageMaker Lavori su notebook

A NotebookJobStep richiede almeno un notebook, un'immagine URI e un nome del kernel in input. Per ulteriori informazioni sui requisiti dei passaggi di Notebook Job e su altri parametri che è possibile impostare per personalizzare il passaggio, vedere sagemaker.workflow.steps. NotebookJobStep.

L'esempio seguente utilizza un numero minimo di argomenti per definire unNotebookJobStep.

from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=input_notebook, image_uri=image_uri, kernel_name=kernel_name )

La fase NotebookJobStep della pipeline viene considerata come un lavoro di SageMaker routine. Di conseguenza, tenete traccia dello stato di esecuzione nella dashboard del lavoro del notebook Studio Classic UI includendo tag specifici nell'tagsargomento. Per ulteriori dettagli sui tag da includere, consultaVisualizza i tuoi lavori su notebook nella dashboard dell'interfaccia utente di Studio.

Inoltre, se si pianifica il lavoro sul notebook utilizzando SageMaker PythonSDK, è possibile specificare solo determinate immagini per eseguire il lavoro sul notebook. Per ulteriori informazioni, consulta Vincoli di immagine per i lavori su notebook Python SageMaker SDK.

Fase fallimentare

Usa FailStep a per interrompere l'esecuzione di Amazon SageMaker Model Building Pipelines quando non viene raggiunta la condizione o lo stato desiderato. Ciò contrassegna anche l'esecuzione di quella pipeline come fallita. FailStep consente inoltre di inserire un messaggio di errore personalizzato, che indica la causa dell'errore di esecuzione della pipeline.

Nota

Quando una FailStep e altre fasi della pipeline vengono eseguite contemporaneamente, la pipeline non si interrompe finché non vengono completate tutte le fasi simultanee.

Limitazioni per l'utilizzo di FailStep

  • Non è possibile aggiungere una FailStep all'elenco DependsOn delle altre fasi. Per ulteriori informazioni, consulta Dipendenza personalizzata tra i passaggi.

  • Le altre fasi non possono fare riferimento a FailStep. È sempre l'ultima fase dell'esecuzione di una pipeline.

  • Non è possibile riprovare l'esecuzione di una pipeline che termina con una FailStep.

È possibile creare il ErrorMessage di FailStep nella forma di stringa di testo statica. In alternativa, puoi anche utilizzare i Parametri della pipeline, un'operazione Join o altre proprietà della fase per creare un messaggio di errore più informativo.

Il frammento di codice di esempio seguente utilizza una FailStep con un ErrorMessage configurato con Parametri della pipeline e un'operazione Join.

from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="AbaloneMSEFail", error_message=Join( on=" ", values=["Execution failed due to MSE >", mse_threshold_param] ), )

proprietà dei passaggi

Utilizzate l'propertiesattributo per aggiungere dipendenze tra i dati tra le fasi della pipeline. SageMaker Le pipeline utilizzano queste dipendenze tra i dati per costruire la definizione della DAG pipeline. È possibile fare riferimento a queste proprietà come valori segnaposto e vengono risolte in fase di runtime.

L'propertiesattributo di un passo SageMaker Pipelines corrisponde all'oggetto restituito da una Describe chiamata per il tipo di processo corrispondente. SageMaker Per ogni tipo di processo, la chiamata Describe restituisce il seguente oggetto di risposta:

Per verificare quali proprietà sono riferibili per ogni tipo di passaggio durante la creazione della dipendenza dei dati, consulta Data Dependency - Property Reference in Amazon Python. SageMaker SDK

Parallelismo a fasi

Quando un passaggio non dipende da nessun altro passaggio, viene eseguito immediatamente dopo l'esecuzione della pipeline. Tuttavia, l'esecuzione di troppe fasi della pipeline in parallelo può esaurire rapidamente le risorse disponibili. Controlla il numero di fasi simultanee per l'esecuzione di una pipeline con ParallelismConfiguration.

L'esempio seguente utilizza ParallelismConfiguration per impostare il limite di fasi simultanee a cinque.

pipeline.create( parallelism_config=ParallelismConfiguration(5), )

Dipendenza dei dati tra i passaggi

È possibile definire la struttura DAG specificando le relazioni tra i dati tra i passaggi. Per creare dipendenze dei dati tra le fasi, passa le proprietà di una fase come input a un'altra fase della pipeline. La fase che riceve l'input viene avviata solo dopo il termine dell'esecuzione della fase che fornisce l'input.

Una dipendenza dai dati utilizza la JsonPath notazione nel seguente formato. Questo formato attraversa il file delle proprietà. JSON Ciò significa che puoi aggiungerne tanti <property> istanze necessarie per raggiungere la proprietà annidata desiderata nel file. Per ulteriori informazioni sulla JsonPath notazione, consulta il repository. JsonPath

<step_name>.properties.<property>.<property>

Di seguito viene illustrato come specificare un bucket Amazon S3 utilizzando la proprietà ProcessingOutputConfig di una fase di elaborazione.

step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri

Per creare la dipendenza dei dati, passa il bucket a una fase di addestramento come segue.

from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )

Per verificare quali proprietà sono riferibili per ogni tipo di passaggio durante la creazione della dipendenza dei dati, consulta Data Dependency - Property Reference in Amazon Python. SageMaker SDK

Dipendenza personalizzata tra i passaggi

Quando si specifica una dipendenza dai dati, SageMaker Pipelines fornisce la connessione dati tra i passaggi. In alternativa, un passaggio può accedere ai dati di un passaggio precedente senza utilizzare SageMaker direttamente Pipelines. In questo caso, puoi creare una dipendenza personalizzata che indichi a SageMaker Pipelines di non iniziare un passaggio fino al termine dell'esecuzione di un altro passaggio. Si crea una dipendenza personalizzata specificando l'attributo DependsOn di una fase.

Ad esempio, quanto segue definisce una fase C che inizia solo dopo la fine dell'esecuzione sia della fase A che della fase B.

{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }

SageMaker Pipelines genera un'eccezione di convalida se la dipendenza creerebbe una dipendenza ciclica.

L'esempio seguente crea una fase di addestramento che inizia al termine di una fase di elaborazione.

processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])

L'esempio seguente crea una fase di addestramento che non si avvia fino al termine dell'esecuzione di due diverse fasi di elaborazione.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])

Di seguito viene fornito un modo alternativo per creare la dipendenza personalizzata.

training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])

L'esempio seguente crea una fase di addestramento che riceve input da una fase di elaborazione e attende il termine dell'esecuzione di una diversa fase di elaborazione.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])

Nell'esempio seguente viene illustrato come recuperare un elenco di stringhe delle dipendenze personalizzate di una fase.

custom_dependencies = training_step.depends_on

Usa un'immagine personalizzata in un passaggio

Puoi utilizzare una qualsiasi delle immagini disponibili del SageMaker Deep Learning Container quando crei una fase della tua pipeline.

Inoltre puoi utilizzare il tuo container con le fasi della pipeline. Poiché non è possibile creare un'immagine da Studio Classic, è necessario creare l'immagine utilizzando un altro metodo prima di utilizzarla con SageMaker Pipelines.

Per utilizzare il vostro contenitore personale durante la creazione dei passaggi per la pipeline, includete l'immagine URI nella definizione dello estimatore. Per ulteriori informazioni sull'utilizzo del proprio contenitore con SageMaker, consulta Using Docker Containers with. SageMaker