Étapes du pipeline de modélisation - Amazon SageMaker

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Étapes du pipeline de modélisation

SageMaker Les pipelines sont composés d'étapes. Ces étapes définissent les actions effectuées par le pipeline et les relations entre les étapes utilisant les propriétés.

Types d'étapes

La section suivante décrit les exigences de chaque type d'étape et fournit un exemple d'implémentation de l'étape. Ce ne sont pas des implémentations fonctionnelles, car elles ne fournissent pas les ressources et les entrées nécessaires. Pour obtenir un tutoriel qui met en œuvre ces étapes, veuillez consulter Création et gestion de SageMaker pipelines.

Note

Vous pouvez également créer une étape à partir de votre code d'apprentissage automatique local en la convertissant en étape SageMaker Pipelines avec le @step décorateur. Pour plus d’informations, consultez décorateur @step.

Amazon SageMaker Model Building Pipelines prend en charge les types d'étapes suivants :

décorateur @step

Vous pouvez créer une étape à partir du code d'apprentissage automatique local à l'aide du @step décorateur. Après avoir testé votre code, vous pouvez convertir la fonction en étape de SageMaker pipeline en l'annotant à l'aide du @step décorateur. SageMaker Pipelines crée et exécute un pipeline lorsque vous transmettez la sortie de la fonction @step -decorated en tant qu'étape à votre pipeline. Vous pouvez également créer un pipeline DAG en plusieurs étapes qui inclut une ou plusieurs fonctions @step décorées ainsi que des étapes de SageMaker pipeline traditionnelles. Pour plus de détails sur la création d'une étape avec le @step décorateur, consultezCode ift-and-shift Python L avec le décorateur @step.

Étape de traitement

Utilisez une étape de traitement pour créer une tâche de traitement pour le traitement des données. Pour plus d'informations sur les tâches de traitement, veuillez consulter Données de traitement et Modèles d'évaluation.

Une étape de traitement nécessite un processeur, un script Python qui définit le code de traitement, les sorties pour le traitement et les arguments de tâche. L'exemple suivant montre comment créer une définition ProcessingStep.

from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )

Transmission des paramètres d'exécution

L'exemple suivant montre comment transmettre des paramètres d'exécution d'un PySpark processeur à unProcessingStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )

Pour plus d'informations sur les exigences relatives aux étapes de traitement, consultez le document sagemaker.workflow.steps. ProcessingStepdocumentation. Pour un exemple détaillé, consultez Définir une étape de traitement pour l'ingénierie des fonctionnalités dans le carnet d'exemples Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines.

Étape d'entraînement

Vous utilisez une étape d'entraînement pour créer une tâche d'entraînement afin d'entraîner un modèle. Pour plus d'informations sur les métiers de formation, consultez Train a Model with Amazon SageMaker.

Une étape d'entraînement nécessite un estimateur, ainsi que des entrées de données d'entraînement et de validation. L'exemple suivant montre comment créer une définition TrainingStep. Pour plus d'informations sur les exigences relatives aux étapes de formation, consultez le document sagemaker.workflow.steps. TrainingStepdocumentation.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )

Étape de réglage

Vous utilisez une étape de réglage pour créer une tâche de réglage d'hyperparamètres, également appelé optimisation des hyperparamètres (HPO). Une tâche de réglage d'hyperparamètres exécute plusieurs tâches d'entraînement, chacune produisant une version de modèle. Pour plus d'informations sur le réglage d'hyperparamètres, veuillez consulter Effectuez le réglage automatique du modèle avec SageMaker.

La tâche de réglage est associée à l' SageMaker expérience pour le pipeline, les tâches de formation étant créées à titre d'essais. Pour plus d’informations, consultez Intégration d'Experiments.

Une étape de réglage nécessite des entrées HyperparameterTuneret un entraînement. Vous pouvez entraîner à nouveau les tâches de réglage précédentes en spécifiant le paramètre warm_start_config du HyperparameterTuner. Pour plus d'informations sur le réglage d'hyperparamètres et le démarrage à chaud, veuillez consulter Exécution d'une tâche de réglage des hyperparamètres avec démarrage à chaud.

Vous utilisez la méthode get_top_model_s3_uri du sagemaker.workflow.steps. TuningStepclasse pour obtenir l'artefact du modèle à partir de l'une des versions les plus performantes du modèle. Pour un bloc-notes expliquant comment utiliser une étape de réglage dans un SageMaker pipeline, consultez sagemaker-pipelines-tuning-step.ipynb.

Important

Les étapes de réglage ont été introduites dans le SDK Amazon SageMaker Python v2.48.0 et dans Amazon SageMaker Studio Classic v3.8.0. Vous devez mettre à jour Studio Classic avant d'utiliser une étape de réglage, sinon le DAG du pipeline ne s'affichera pas. Pour mettre à jour Studio Classic, voirArrêter et mettre à jour SageMaker Studio Classic.

L'exemple suivant montre comment créer une définition TuningStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://my-bucket/my-data")) )

Obtenir la meilleure version de modèle

L'exemple suivant montre comment obtenir la meilleure version de modèle à partir de la tâche de réglage à l'aide de la méthode get_top_model_s3_uri. Tout au plus, les 50 versions les plus performantes sont disponibles classées par HyperParameterTuningJobobjectif. L'argument top_k est un index dans les versions, où top_k=0 est la version la plus performante et top_k=49 est la version la moins performante.

best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )

Pour plus d'informations sur les exigences relatives aux étapes de réglage, consultez le document sagemaker.workflow.steps. TuningStepdocumentation.

Étape AutoML

Utilisez l'API AutoML pour créer une tâche AutoML afin d'entraîner automatiquement un modèle. Pour plus d'informations sur les tâches AutoML, consultez Automatiser le développement de modèles avec Amazon SageMaker Autopilot.

Note

Actuellement, l'étape AutoML ne prend en charge que le mode d'entraînement d'assemblage.

L'exemple suivant montre comment créer une définition avec AutoMLStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="<role>", target_attribute_name="my_target_attribute_name", mode="ENSEMBLING", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://my-bucket/my-training-data", target_attribute_name="my_target_attribute_name", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://my-bucket/my-validation-data", target_attribute_name="my_target_attribute_name", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )

Obtenir la meilleure version de modèle

L'étape AutoML entraîne automatiquement plusieurs modèles candidats. Vous pouvez obtenir le modèle avec la meilleure métrique objective à partir de la tâche AutoML à l'aide de la méthode get_best_auto_ml_model et d'un role IAM pour accéder aux artefacts du modèle comme suit.

best_model = step_automl.get_best_auto_ml_model(role=<role>)

Pour plus d'informations, consultez l'étape AutoML du SDK SageMaker Python.

Étape du modèle

Utilisez un ModelStep pour créer ou enregistrer un SageMaker modèle. Pour plus d'informations sur les ModelStep exigences, consultez le document sagemaker.workflow.model_step. ModelStepdocumentation.

Création d’un modèle

Vous pouvez utiliser un ModelStep pour créer un SageMaker modèle. A ModelStep nécessite des artefacts de modèle et des informations sur le type d' SageMaker instance que vous devez utiliser pour créer le modèle. Pour plus d'informations sur SageMaker les modèles, consultez Train a Model with Amazon SageMaker.

L'exemple suivant montre comment créer une définition ModelStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )

Enregistrement d'un modèle

Vous pouvez utiliser a ModelStep pour enregistrer un sagemaker.model.Model ou un dans sagemaker.pipeline.PipelineModel le registre des SageMaker modèles Amazon. Un PipelineModel représente un pipeline d'inférence, qui est un modèle composé d'une séquence linéaire de conteneurs qui traitent les demandes d'inférence. Pour savoir comment enregistrer un modèle, veuillez consulter Enregistrer et déployer des modèles avec Model Registry.

L'exemple suivant montre comment créer une ModelStep qui enregistre un PipelineModel.

import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )

CreateModel Étape

Important

Nous vous recommandons Étape du modèle de l'utiliser pour créer des modèles à partir de la version 2.90.0 du SDK Python SageMaker . CreateModelStepcontinuera de fonctionner dans les versions précédentes du SDK SageMaker Python, mais n'est plus activement pris en charge.

Vous utilisez une CreateModel étape pour créer un SageMaker modèle. Pour plus d'informations sur SageMaker les modèles, consultez Train a Model with Amazon SageMaker.

Une étape de création de modèle nécessite des artefacts de modèle et des informations sur le type d' SageMaker instance que vous devez utiliser pour créer le modèle. L'exemple suivant montre comment créer une définition d'étape CreateModel. Pour plus d'informations sur les exigences relatives aux CreateModel étapes, consultez le document sagemaker.workflow.steps. CreateModelDocumentation des étapes.

from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )

RegisterModel Étape

Important

Nous vous recommandons Étape du modèle de l'utiliser pour enregistrer des modèles à partir de la version 2.90.0 du SDK Python SageMaker . RegisterModelcontinuera de fonctionner dans les versions précédentes du SDK SageMaker Python, mais n'est plus activement pris en charge.

Vous utilisez une RegisterModel étape pour enregistrer un SageMaker.Model.Model ou un sagemaker.pipeline. PipelineModelavec le registre des SageMaker modèles Amazon. Un PipelineModel représente un pipeline d'inférence, qui est un modèle composé d'une séquence linéaire de conteneurs qui traitent les demandes d'inférence.

Pour savoir comment enregistrer un modèle, veuillez consulter Enregistrer et déployer des modèles avec Model Registry. Pour plus d'informations sur les exigences relatives aux RegisterModel étapes, consultez le document sagemaker.workflow.step_collections. RegisterModeldocumentation.

L'exemple suivant montre comment créer une étape RegisterModel qui enregistre un PipelineModel.

import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', )

Si le model n'est pas fourni, l'étape RegisterModel nécessite un estimateur, comme illustré dans l'exemple suivant.

from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

Étape de transformation

Pour exécuter des inférences sur un jeu de données entier, vous utilisez une étape de transformation pour une transformation par lots. Pour plus d'informations sur la transformation par lots, veuillez consulter Exécution de transformations par lots avec des pipelines d'inférence.

Une étape de transformation nécessite un transformateur et les données sur lesquelles exécuter la transformation par lots. L'exemple suivant montre comment créer une définition d'étape Transform. Pour plus d'informations sur les exigences relatives aux Transform étapes, consultez le document sagemaker.workflow.steps. TransformStepdocumentation.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://my-bucket/my-data"), )

Étape de condition

Vous utilisez une étape de condition pour évaluer la condition des propriétés de l'étape afin d'évaluer quelle action doit être effectuée ensuite dans le pipeline.

Une étape de condition nécessite une liste de conditions, une liste d'étapes à exécuter si la condition a pour valeur true et une liste des étapes à exécuter si la condition a pour valeur false. L'exemple suivant montre comment créer une définition ConditionStep.

Limites

  • SageMaker Pipelines ne prend pas en charge l'utilisation d'étapes conditionnelles imbriquées. Vous ne pouvez pas transmettre une étape de condition comme entrée pour une autre étape de condition.

  • Une étape de condition ne peut pas utiliser des étapes identiques dans les deux branches. Si vous avez besoin de la même fonctionnalité d'étape dans les deux branches, dupliquez l'étape et donnez-lui un nom différent.

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )

Pour plus d'informations sur les ConditionStep exigences, consultez le document sagemaker.workflow.condition_step. ConditionStepRéférence d'API. Pour plus d'informations sur les conditions prises en charge, consultez Amazon SageMaker Model Building Pipelines - Conditions dans la documentation du SDK SageMaker Python.

Étape de rappel

Vous utilisez une Callback étape pour intégrer dans votre flux de travail des processus et AWS des services supplémentaires qui ne sont pas directement fournis par Amazon SageMaker Model Building Pipelines. Lorsqu'une étape de Callback s'exécute, la procédure suivante se produit :

  • SageMaker Pipelines envoie un message à une file d'attente Amazon Simple Queue Service (Amazon SQS) spécifiée par le client. Le message contient un jeton SageMaker généré par Pipelines et une liste de paramètres d'entrée fournie par le client. Après avoir envoyé le message, SageMaker Pipelines attend une réponse du client.

  • Le client récupère le message dans la file d'attente Amazon SQS et démarre son processus personnalisé.

  • Lorsque le processus est terminé, le client appelle l'une des API suivantes et envoie le jeton généré par les SageMaker pipelines :

  • L'appel d'API SageMaker oblige les pipelines à poursuivre le processus de pipeline ou à échouer.

Pour plus d'informations sur les exigences relatives aux Callback étapes, consultez le document sagemaker.workflow.callback_step. CallbackStepdocumentation. Pour une solution complète, voir Étendre les SageMaker pipelines pour inclure des étapes personnalisées à l'aide d'étapes de rappel.

Important

Callbackles étapes ont été introduites dans Amazon SageMaker Python SDK v2.45.0 et Amazon SageMaker Studio Classic v3.6.2. Vous devez mettre à jour Studio Classic avant d'utiliser une Callback étape, sinon le DAG du pipeline ne s'affichera pas. Pour mettre à jour Studio Classic, voirArrêter et mettre à jour SageMaker Studio Classic.

L'exemple suivant illustre une mise en œuvre de la procédure précédente.

from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
Note

Paramètres de sortie pour CallbackStep ne doit pas être imbriqué. Par exemple, si vous utilisez un dictionnaire imbriqué comme paramètre de sortie, le dictionnaire est traité comme une chaîne unique (par ex. {"output1": "{\"nested_output1\":\"my-output\"}"}). Si vous fournissez une valeur imbriquée, lorsque vous essayez de faire référence à un paramètre de sortie particulier, une erreur client non réessayable est renvoyée.

Comportement d'arrêt

Un processus de pipeline ne s'arrête pas lorsqu'une étape de Callback est en cours d'exécution.

Lorsque vous appelez StopPipelineExecution sur un processus de pipeline comportant une Callback étape en cours d'exécution, SageMaker Pipelines envoie un message Amazon SQS supplémentaire à la file d'attente SQS spécifiée. Le corps du message SQS contient un champ Status (Statut), qui est défini sur Stopping. L'exemple suivant montre le corps d'un message SQS.

{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }

Vous devez ajouter une logique à votre client de message Amazon SQS pour effectuer toutes les actions nécessaires (par exemple, le nettoyage des ressources) dès la réception du message, suivi d'un appel à SendPipelineExecutionStepSuccess ou SendPipelineExecutionStepFailure.

Ce n'est que lorsque SageMaker Pipelines reçoit l'un de ces appels qu'il arrête le processus de pipeline.

Étape Lambda

Vous utilisez une étape Lambda pour exécuter une AWS Lambda fonction. Vous pouvez exécuter une fonction Lambda existante ou SageMaker créer et exécuter une nouvelle fonction Lambda. Pour un bloc-notes expliquant comment utiliser une étape Lambda dans un pipeline, consultez SageMaker sagemaker-pipelines-lambda-step.ipynb.

Important

Les étapes Lambda ont été introduites dans le SDK Amazon SageMaker Python v2.51.0 et dans Amazon SageMaker Studio Classic v3.9.1. Vous devez mettre à jour Studio Classic avant d'utiliser une étape Lambda, sinon le DAG du pipeline ne s'affichera pas. Pour mettre à jour Studio Classic, voirArrêter et mettre à jour SageMaker Studio Classic.

SageMaker fournit la classe SageMaker.Lambda_Helper.Lambda pour créer, mettre à jour, invoquer et supprimer des fonctions Lambda. Lambdaporte la signature suivante.

Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )

Le sagemaker.workflow.lambda_step. LambdaStepla classe a un lambda_func argument de typeLambda. Pour appeler une fonction Lambda existante, la seule exigence est de fournir l'Amazon Resource Name (ARN) de la fonction à function_arn. Si vous ne définissez aucune valeur pour function_arn, vous devez spécifier handler et l'un des éléments suivants :

  • zipped_code_dir – Chemin de la fonction Lambda zippée

    s3_bucket – Compartiment Amazon S3 où zipped_code_dir doit être téléchargé

  • script – Chemin d'accès du fichier script de la fonction Lambda

L'exemple suivant montre comment créer une définition d'étape Lambda qui appelle une fonction Lambda existante.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

L'exemple suivant montre comment créer une définition d'étape Lambda qui appelle une fonction Lambda existante.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

Entrées et sorties

Si votre fonction Lambda a des entrées ou des sorties, elles doivent également être définies dans votre étape Lambda.

Note

Les paramètres d'entrée et de sortie ne doivent pas être imbriqués. Par exemple, si vous utilisez un dictionnaire imbriqué comme paramètre de sortie, le dictionnaire est traité comme une chaîne unique (par ex. {"output1": "{\"nested_output1\":\"my-output\"}"}). Si vous fournissez une valeur imbriquée et que vous essayez d'y faire référence ultérieurement, une erreur client non réessayable est renvoyée.

Lors de la définition de l'étape Lambda, inputs doit être un dictionnaire de paires clé-valeur. Chaque valeur du dictionnaire inputs doit être de type primitif (chaîne, entier ou flottante). Les objets imbriqués ne sont pas pris en charge. Si elle n'est pas définie, la valeur inputs est définie par défaut sur None.

La valeur outputs doit être une liste de clés. Ces clés font référence à un dictionnaire défini dans la sortie de la fonction Lambda. Comme inputs, ces clés doivent être de type primitif et les objets imbriqués ne sont pas pris en charge.

Délai d'expiration et comportement d'arrêt

La classe Lambda a un argument timeout qui spécifie la durée maximale d'exécution de la fonction Lambda. La valeur par défaut est de 120 secondes, avec une valeur maximum de 10 minutes. Si la fonction Lambda est en cours d'exécution lorsque le délai d'expiration est atteint, l'étape Lambda échoue. Cependant, la fonction Lambda continue de s'exécuter.

Un processus de pipeline ne peut pas être arrêté pendant qu'une étape Lambda est en cours d'exécution, car la fonction Lambda invoquée par l'étape Lambda ne peut pas être arrêtée. Si vous tentez d'arrêter le processus pendant que la fonction Lambda est en cours d'exécution, le pipeline attend que la fonction Lambda se termine ou que le délai d'expiration soit atteint, selon la première éventualité, puis s'arrête. Si la fonction Lambda se termine, le statut de processus du pipeline est Stopped. Si le délai d'expiration est atteint, le statut de processus du pipeline est Failed.

ClarifyCheck Étape

Vous pouvez utiliser l'étape ClarifyCheck afin d'effectuer des vérifications de dérive de référence par rapport aux références précédentes pour l'analyse de biais et l'explicabilité de modèle. Vous pouvez ensuite générer et enregistrer vos références avec le model.register() et transmettre la sortie de cette méthode à Étape du modèle en utilisant step_args. Ces lignes de base pour le contrôle de dérive peuvent être utilisées par Amazon SageMaker Model Monitor pour les points de terminaison de votre modèle, de sorte que vous n'avez pas besoin de faire une suggestion de référence séparément. L'étape ClarifyCheck peut également extraire des références pour la vérification de dérive à partir du registre des modèles. Cette ClarifyCheck étape s'appuie sur le conteneur prédéfini Amazon SageMaker Clarify qui fournit une gamme de fonctionnalités de surveillance des modèles, notamment la suggestion de contraintes et la validation des contraintes par rapport à une référence donnée. Pour plus d’informations, consultez Commencez avec un conteneur SageMaker Clarify.

Configuration de l' ClarifyCheck étape

Vous pouvez configurer l'étape ClarifyCheck pour effectuer l'un des types de vérification suivants chaque fois qu'il est utilisé dans un pipeline.

  • Vérification de biais des données

  • Vérification de biais de modèle

  • Vérification d'explicabilité de modèle

Pour ce faire, définissez le paramètre clarify_check_config avec l'une des valeurs de type de vérification suivantes :

  • DataBiasCheckConfig

  • ModelBiasCheckConfig

  • ModelExplainabilityCheckConfig

L'ClarifyCheckétape lance une tâche de traitement qui exécute le conteneur SageMaker prédéfini Clarify et nécessite des configurations dédiées pour la vérification et la tâche de traitement. ClarifyCheckConfiget CheckJobConfig sont des fonctions d'assistance pour ces configurations qui correspondent à la façon dont la tâche de traitement SageMaker Clarify calcule pour vérifier le biais du modèle, le biais des données ou l'explicabilité du modèle. Pour plus d’informations, consultez Exécutez des tâches de traitement SageMaker Clarify pour l'analyse des biais et l'explicabilité.

Contrôle des comportements d'étape pour la vérification de dérive

L'étape ClarifyCheck nécessite les deux indicateurs booléens suivants pour le contrôle de son comportement :

  • skip_check : ce paramètre indique si la vérification de dérive par rapport à la référence précédente est ignorée ou non. S'il est défini sur False, la référence précédente du type de contrôle configuré doit être disponible.

  • register_new_baseline : ce paramètre indique si une référence recalculée est accessible via la propriété BaselineUsedForDriftCheckConstraints de l'étape. S'il est défini sur False, la référence précédente du type de contrôle configuré doit également être disponible. Vous pouvez y accéder via la propriété BaselineUsedForDriftCheckConstraints.

Pour plus d’informations, consultez Calcul de référence, détection de la dérive et cycle de vie avec Amazon SageMaker Model Building Pipelines ClarifyCheck et QualityCheck étapes.

Utilisation des références

Vous pouvez éventuellement spécifier le model_package_group_name pour localiser la référence existante et l'étape ClarifyCheck extrait les DriftCheckBaselines sur le dernier package de modèles approuvé dans le groupe de modèles. Vous pouvez également fournir une référence précédente via le paramètre supplied_baseline_constraints. Si vous spécifiez le model_package_group_name et les supplied_baseline_constraints, l'étape ClarifyCheck utilise la référence spécifiée par le paramètre supplied_baseline_constraints.

Pour plus d'informations sur l'utilisation des exigences relatives aux ClarifyCheck étapes, consultez le document sagemaker.workflow.steps. ClarifyCheckAccédez au SageMaker SageMakerSDK Amazon pour Python. Pour un bloc-notes Amazon SageMaker Studio Classic expliquant comment utiliser ClarifyCheck Step dans SageMaker Pipelines, consultez sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Exemple Créer une étape ClarifyCheck pour la vérification du biais de données
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']), label=0, dataset_type="text/csv", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json", model_package_group_name="MyModelPackageGroup" )

QualityCheck Étape

Vous pouvez utiliser l'étape QualityCheck pour effectuer des suggestions de référence et des vérifications de dérive par rapport à une référence précédente pour la qualité des données ou la qualité du modèle dans un pipeline. Vous pouvez ensuite générer et enregistrer vos références avec le model.register() et transmettre la sortie de cette méthode à Étape du modèle en utilisant step_args. Model Monitor peut utiliser ces références pour la vérification de dérive pour les points de terminaison de votre modèle, de sorte que vous n'avez pas besoin d'effectuer une suggestion de référence séparément. L'étape QualityCheck peut également extraire des références pour la vérification de dérive à partir du registre des modèles. Cette QualityCheck étape tire parti du conteneur prédéfini Amazon SageMaker Model Monitor, qui possède une gamme de fonctionnalités de surveillance des modèles, notamment la suggestion de contraintes, la génération de statistiques et la validation des contraintes par rapport à une base de référence. Pour plus d’informations, consultez Conteneur préfabriqué Amazon SageMaker Model Monitor.

Configuration de l' QualityCheck étape

Vous pouvez configurer l'étape QualityCheck pour effectuer l'un des types de vérification suivants chaque fois qu'il est utilisé dans un pipeline.

  • Vérification de la qualité des données

  • Vérification de la qualité du modèle

Pour ce faire, définissez le paramètre quality_check_config avec l'une des valeurs de type de vérification suivantes :

  • DataQualityCheckConfig

  • ModelQualityCheckConfig

L'étape QualityCheck lance une tâche de traitement qui exécute le conteneur prédéfini Model Monitor et nécessite des configurations dédiées pour la vérification et la tâche de traitement. QualityCheckConfig et CheckJobConfig sont des fonctions d'assistance pour ces configurations qui sont alignées sur la façon dont Model Monitor crée une référence pour la surveillance de la qualité du modèle ou de la qualité des données. Pour plus d'informations sur les suggestions de référence Model Monitor, veuillez consulter Création d'une référence et Création d'une référence de qualité du modèle.

Contrôle des comportements d'étape pour la vérification de dérive

L'étape QualityCheck nécessite les deux indicateurs booléens suivants pour le contrôle de son comportement :

  • skip_check : ce paramètre indique si la vérification de dérive par rapport à la référence précédente est ignorée ou non. S'il est défini sur False, la référence précédente du type de contrôle configuré doit être disponible.

  • register_new_baseline : ce paramètre indique si une référence recalculée est accessible via les propriétés BaselineUsedForDriftCheckConstraints et BaselineUsedForDriftCheckStatistics de l'étape. S'il est défini sur False, la référence précédente du type de contrôle configuré doit également être disponible. Vous pouvez y accéder via les propriétés BaselineUsedForDriftCheckConstraints et BaselineUsedForDriftCheckStatistics.

Pour plus d’informations, consultez Calcul de référence, détection de la dérive et cycle de vie avec Amazon SageMaker Model Building Pipelines ClarifyCheck et QualityCheck étapes.

Utilisation des références

Vous pouvez spécifier une référence précédente directement via les paramètres supplied_baseline_statistics et supplied_baseline_constraints, ou vous pouvez simplement spécifier le model_package_group_name pour que l'étape QualityCheck extraie les DriftCheckBaselines sur le dernier package de modèles approuvé dans le groupe de modèles. Lorsque vous spécifiez le model_package_group_name, les supplied_baseline_constraints et les supplied_baseline_statistics, l'étape QualityCheck utilise la référence spécifiée par les supplied_baseline_constraints et les supplied_baseline_statistics sur le type de vérification de l'étape QualityCheck que êtes en train d'exécuter.

Pour plus d'informations sur l'utilisation des exigences relatives aux QualityCheck étapes, consultez le document sagemaker.workflow.steps. QualityCheckAccédez au SageMaker SageMakerSDK Amazon pour Python. Pour un bloc-notes Amazon SageMaker Studio Classic expliquant comment utiliser QualityCheck Step dans SageMaker Pipelines, consultez sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Exemple Créer une étape QualityCheck pour la vérification de la qualité des données
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json", model_package_group_name="MyModelPackageGroup" )

Étape EMR

Vous pouvez utiliser l'étape EMR Amazon SageMaker Model Building Pipelines pour traiter les étapes Amazon EMR sur un cluster Amazon EMR en cours d'exécution ou demander au pipeline de créer et de gérer un cluster Amazon EMR pour vous. Pour plus d'informations sur Amazon EMR, consultez Bien démarrer avec Amazon EMR.

L'étape EMR nécessite que EMRStepConfig inclue l'emplacement du fichier JAR que doit utiliser le cluster Amazon EMR et tous les arguments à transmettre. Vous fournissez également l'ID du cluster Amazon EMR si vous souhaitez exécuter l'étape sur un cluster EMR en cours d'exécution, ou la configuration du cluster si vous souhaitez que l'étape EMR s'exécute sur un cluster qu'elle crée, gère et résilie pour vous. Les sections suivantes incluent des exemples et des liens vers des exemples de blocs-notes illustrant les deux méthodes.

Note
  • Les étapes EMR exigent que le rôle transmis à votre pipeline ait des autorisations supplémentaires. Vous devez attacher la politique gérée par AWS  : AmazonSageMakerPipelinesIntegrations à votre rôle de pipeline, ou vous assurer que le rôle inclut les autorisations de cette politique.

  • L'étape EMR n'est pas prise en charge sur EMR sans serveur, ni sur Amazon EMR sur EKS.

  • Si vous traitez une étape EMR sur un cluster en cours d'exécution, vous pouvez uniquement utiliser un cluster dans l'un des états suivants : STARTING, BOOTSTRAPPING, RUNNING ou WAITING.

  • Si vous traitez les étapes EMR sur un cluster en cours d'exécution, vous pouvez avoir au maximum 256 étapes EMR dans un état PENDING sur un cluster EMR. Les étapes EMR soumises au-delà de cette limite entraînent l'échec de l'exécution du pipeline. Vous pouvez envisager d'utiliser Politique de nouvelle tentative pour les étapes du pipeline.

  • Vous pouvez spécifier l'ID ou la configuration du cluster, mais pas les deux.

  • L'étape EMR repose sur Amazon EventBridge pour surveiller les modifications de l'étape EMR ou de l'état du cluster. Si vous traitez votre tâche Amazon EMR sur un cluster en cours d'exécution, l'étape EMR utilise la règle SageMakerPipelineExecutionEMRStepStatusUpdateRule pour surveiller son état. Si vous traitez votre tâche sur un cluster créé pour vous par l'étape EMR, l'étape utilise la règle SageMakerPipelineExecutionEMRClusterStatusRule pour surveiller les modifications de l'état du cluster. Si l'une de ces EventBridge règles apparaît dans votre AWS compte, ne la supprimez pas, sinon votre étape EMR risque de ne pas être terminée.

Lancement d'une nouvelle tâche sur un cluster Amazon EMR en cours d'exécution

Si vous souhaitez lancer une nouvelle tâche sur un cluster Amazon EMR en cours d'exécution, vous devez transmettre l'ID du cluster sous forme de chaîne à l'argument cluster_id de EMRStep. L'exemple suivant illustre cette procédure.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )

Pour un exemple de bloc-notes qui vous guide à travers un exemple complet, voir SageMaker Pipelines EMR Step With Running EMR Cluster.

Lancement d'une nouvelle tâche sur un nouveau cluster Amazon EMR

Si vous souhaitez lancer une nouvelle tâche sur un nouveau cluster EMRStep créé pour vous, fournissez la configuration de votre cluster sous forme de dictionnaire avec la même structure qu'une demande RunJobFlow. Toutefois, n'incluez pas les champs suivants dans la configuration de votre cluster :

  • [Name]

  • [Steps]

  • [AutoTerminationPolicy]

  • [Instances][KeepJobFlowAliveWhenNoSteps]

  • [Instances][TerminationProtected]

Tous les autres arguments RunJobFlow peuvent être utilisés dans votre configuration de cluster. Pour plus de détails sur la syntaxe des demandes, consultez RunJobFlow.

L'exemple suivant transmet une configuration de cluster à une définition d'étape EMR, qui invite l'étape à lancer une nouvelle tâche sur un nouveau cluster EMR. Dans cet exemple, la configuration du cluster EMR inclut des spécifications pour les nœuds primaires et principaux du cluster EMR. Pour plus d'informations sur les types de nœuds Amazon EMR, consultez Comprendre les types de nœuds : nœuds primaires, principaux et de tâches.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role", "ServiceRole": "service-role" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )

Pour un exemple de bloc-notes qui vous guide à travers un exemple complet, voir SageMaker Pipelines EMR Step With Cluster Lifecycle Management.

Notebook Job Step

Utilisez a NotebookJobStep pour exécuter votre SageMaker Notebook Job de manière non interactive en tant qu'étape du pipeline. Pour plus d'informations sur SageMaker Notebook Jobs, consultezSageMaker Emplois sur ordinateur portable.

A NotebookJobStep nécessite au minimum un bloc-notes d'entrée, une URI d'image et un nom de noyau. Pour plus d'informations sur les exigences relatives aux étapes de Notebook Job et sur les autres paramètres que vous pouvez définir pour personnaliser votre étape, consultez sagemaker.workflow.steps. NotebookJobÉtape.

L'exemple suivant utilise un minimum d'arguments pour définir unNotebookJobStep.

from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=input_notebook, image_uri=image_uri, kernel_name=kernel_name )

Votre étape de NotebookJobStep pipeline est traitée comme une tâche de SageMaker carnet. Vous pouvez donc suivre l'état d'exécution dans le tableau de bord des tâches de bloc-notes de l'interface utilisateur de Studio Classic si vous incluez des balises spécifiques dans l'tagsargument. Pour plus de détails sur les balises à inclure, consultezConsultez les tâches de votre bloc-notes dans le tableau de bord de l'interface utilisateur de Studio.

De plus, si vous planifiez votre tâche de bloc-notes à l'aide du SDK SageMaker Python, vous ne pouvez spécifier que certaines images pour exécuter votre tâche de bloc-notes. Pour plus d’informations, consultez Contraintes d'image pour les SageMaker tâches de bloc-notes du SDK Python.

Étape de Fail

Vous utilisez a FailStep pour arrêter l'exécution d'un Amazon SageMaker Model Building Pipelines lorsqu'une condition ou un état souhaité n'est pas atteint et pour marquer l'exécution de ce pipeline comme ayant échoué. L'FailStep vous permet également de saisir un message d'erreur personnalisé indiquant la cause de l'échec d'exécution du pipeline.

Note

Lorsqu'une FailStep et les autres étapes du pipeline s'exécutent simultanément, le pipeline ne se termine pas tant que toutes les étapes simultanées ne sont pas terminées.

Limites d'utilisation pour FailStep

  • Vous ne pouvez pas ajouter d'FailStep vers la liste DependsOn des autres étapes. Pour plus d’informations, consultez Dépendance personnalisée entre étapes.

  • Les autres étapes ne peuvent pas faire référence à l'FailStep. C'est toujours la dernière étape de l'exécution d'un pipeline.

  • Vous ne pouvez pas réessayer une exécution de pipeline se terminant par une FailStep.

Vous pouvez créer l'ErrorMessage FailStep sous la forme d'une chaîne de texte statique. Vous pouvez également utiliser les Paramètres du pipeline, une opération de jointure, ou autre Propriétés de l'étape pour créer un message d'erreur plus informatif.

L'exemple d'extrait de code suivant utilise une FailStep avec un ErrorMessage configuré avec les paramètres du pipeline et une opération de Join.

from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="AbaloneMSEFail", error_message=Join( on=" ", values=["Execution failed due to MSE >", mse_threshold_param] ), )

Propriétés de l'étape

L'attribut properties est utilisé pour ajouter des dépendances de données entre les étapes du pipeline. Ces dépendances de données sont ensuite utilisées par les SageMaker pipelines pour construire le DAG à partir de la définition du pipeline. Ces propriétés peuvent être référencées en tant que valeurs d'espace réservé et sont résolues lors de l'exécution.

L'propertiesattribut d'une étape SageMaker Pipelines correspond à l'objet renvoyé par un Describe appel pour le type de SageMaker tâche correspondant. Pour chaque type de tâche, l'appel Describe renvoie l'objet de réponse suivant :

Pour vérifier quelles propriétés peuvent être référencées pour chaque type d'étape lors de la création de dépendances de données, consultez Data Dependency - Property Reference dans le SDK Amazon SageMaker Python.

Parallélisme d'étapes

Lorsqu'une étape ne dépend d'aucune autre étape, elle est exécutée immédiatement lors de l'exécution du pipeline. Toutefois, l'exécution en parallèle d'un trop grand nombre d'étapes du pipeline peut rapidement épuiser les ressources disponibles. Contrôlez le nombre d'étapes simultanées pour une exécution de pipeline avec ParallelismConfiguration.

L'exemple suivant utilise ParallelismConfiguration pour définir la limite des étapes simultanées à cinq.

pipeline.create( parallelism_config=ParallelismConfiguration(5), )

Dépendance des données entre étapes

Vous définissez la structure de votre DAG en spécifiant les relations des données entre les étapes. Pour créer des dépendances de données entre les étapes, transmettez les propriétés d'une étape comme entrée à une autre étape du pipeline. L'étape recevant l'entrée n'est démarrée qu'après l'étape fournissant l'entrée a terminé l'exécution.

Une dépendance de données utilise une JsonPath notation au format suivant. Ce format traverse le fichier de propriétés JSON, ce qui signifie que vous pouvez ajouter autant d'instances <propiété> que nécessaire pour atteindre la propriété imbriquée souhaitée dans le fichier. Pour plus d'informations sur la JsonPath notation, consultez le JsonPath dépôt.

<step_name>.properties.<property>.<property>

Ce qui suit montre comment spécifier un compartiment Amazon S3 à l'aide de la propriété ProcessingOutputConfig d'une étape de traitement.

step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri

Pour créer la dépendance des données, transmettez le compartiment à une étape d'entraînement comme suit.

from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )

Pour vérifier quelles propriétés peuvent être référencées pour chaque type d'étape lors de la création de dépendances de données, consultez Data Dependency - Property Reference dans le SDK Amazon SageMaker Python.

Dépendance personnalisée entre étapes

Lorsque vous spécifiez une dépendance aux données, SageMaker Pipelines fournit la connexion de données entre les étapes. Une étape peut également accéder aux données d'une étape précédente sans utiliser directement les SageMaker pipelines. Dans ce cas, vous pouvez créer une dépendance personnalisée qui indique à SageMaker Pipelines de ne pas démarrer une étape avant la fin de l'exécution d'une autre étape. Vous créez une dépendance personnalisée en spécifiant l'attribut DependsOn d'une étape.

À titre d'exemple, ce qui suit définit une étape C qui démarre seulement après que les deux étapes A et B terminent leur exécution.

{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }

SageMaker Pipelines émet une exception de validation si la dépendance crée une dépendance cyclique.

L'exemple suivant crée une étape d'entraînement qui démarre après l'exécution d'une étape de traitement.

processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])

L'exemple suivant crée une étape d'entraînement qui ne démarre pas tant que l'exécution de deux étapes de traitement différentes n'est pas terminée.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])

Ce qui suit fournit un autre moyen de créer la dépendance personnalisée.

training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])

L'exemple suivant crée une étape d'entraînement qui reçoit les entrées d'une étape de traitement et attend que l'exécution d'une autre étape de traitement se termine.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])

L'exemple suivant montre comment extraire une liste de chaînes des dépendances personnalisées d'une étape.

custom_dependencies = training_step.depends_on

Utiliser une image personnalisée dans une étape

Vous pouvez utiliser n'importe laquelle des images de conteneur SageMaker Deep Learning disponibles lorsque vous créez une étape dans votre pipeline.

Vous pouvez également utiliser votre propre conteneur avec des étapes de pipeline. Comme vous ne pouvez pas créer d'image depuis Amazon SageMaker Studio Classic, vous devez créer votre image à l'aide d'une autre méthode avant de l'utiliser avec Amazon SageMaker Model Building Pipelines.

Pour utiliser votre propre conteneur lors de la création des étapes pour votre pipeline, incluez l'URI de l'image dans la définition de l'estimateur. Pour plus d'informations sur l'utilisation de votre propre conteneur avec SageMaker, consultez la section Utilisation de conteneurs Docker avec SageMaker.