Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Étapes du pipeline de modélisation
SageMaker Les pipelines sont composés d'étapes. Ces étapes définissent les actions effectuées par le pipeline et les relations entre les étapes utilisant les propriétés.
Rubriques
Types d'étapes
La section suivante décrit les exigences de chaque type d'étape et fournit un exemple d'implémentation de l'étape. Ce ne sont pas des implémentations fonctionnelles, car elles ne fournissent pas les ressources et les entrées nécessaires. Pour obtenir un tutoriel qui met en œuvre ces étapes, veuillez consulter Création et gestion de SageMaker pipelines.
Note
Vous pouvez également créer une étape à partir de votre code d'apprentissage automatique local en la convertissant en étape SageMaker Pipelines avec le @step
décorateur. Pour plus d’informations, consultez décorateur @step.
Amazon SageMaker Model Building Pipelines prend en charge les types d'étapes suivants :
décorateur @step
Vous pouvez créer une étape à partir du code d'apprentissage automatique local à l'aide du @step
décorateur. Après avoir testé votre code, vous pouvez convertir la fonction en étape de SageMaker pipeline en l'annotant à l'aide du @step
décorateur. SageMaker Pipelines crée et exécute un pipeline lorsque vous transmettez la sortie de la fonction @step
-decorated en tant qu'étape à votre pipeline. Vous pouvez également créer un pipeline DAG en plusieurs étapes qui inclut une ou plusieurs fonctions @step
décorées ainsi que des étapes de SageMaker pipeline traditionnelles. Pour plus de détails sur la création d'une étape avec le @step
décorateur, consultezCode ift-and-shift Python L avec le décorateur @step.
Étape de traitement
Utilisez une étape de traitement pour créer une tâche de traitement pour le traitement des données. Pour plus d'informations sur les tâches de traitement, veuillez consulter Données de traitement et Modèles d'évaluation.
Une étape de traitement nécessite un processeur, un script Python qui définit le code de traitement, les sorties pour le traitement et les arguments de tâche. L'exemple suivant montre comment créer une définition ProcessingStep
.
from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=
<role>
, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=
<input_data>
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )
Transmission des paramètres d'exécution
L'exemple suivant montre comment transmettre des paramètres d'exécution d'un PySpark processeur à unProcessingStep
.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=
<role>
, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>
, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )
Pour plus d'informations sur les exigences relatives aux étapes de traitement, consultez le document sagemaker.workflow.steps. ProcessingStep
Étape d'entraînement
Vous utilisez une étape d'entraînement pour créer une tâche d'entraînement afin d'entraîner un modèle. Pour plus d'informations sur les métiers de formation, consultez Train a Model with Amazon SageMaker.
Une étape d'entraînement nécessite un estimateur, ainsi que des entrées de données d'entraînement et de validation. L'exemple suivant montre comment créer une définition TrainingStep
. Pour plus d'informations sur les exigences relatives aux étapes de formation, consultez le document sagemaker.workflow.steps. TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )
Étape de réglage
Vous utilisez une étape de réglage pour créer une tâche de réglage d'hyperparamètres, également appelé optimisation des hyperparamètres (HPO). Une tâche de réglage d'hyperparamètres exécute plusieurs tâches d'entraînement, chacune produisant une version de modèle. Pour plus d'informations sur le réglage d'hyperparamètres, veuillez consulter Effectuez le réglage automatique du modèle avec SageMaker.
La tâche de réglage est associée à l' SageMaker expérience pour le pipeline, les tâches de formation étant créées à titre d'essais. Pour plus d’informations, consultez Intégration d'Experiments.
Une étape de réglage nécessite des entrées HyperparameterTunerwarm_start_config
du HyperparameterTuner
. Pour plus d'informations sur le réglage d'hyperparamètres et le démarrage à chaud, veuillez consulter Exécution d'une tâche de réglage des hyperparamètres avec démarrage à chaud.
Vous utilisez la méthode get_top_model_s3_uri
Important
Les étapes de réglage ont été introduites dans le SDK Amazon SageMaker Python v2.48.0 et dans Amazon SageMaker Studio Classic v3.8.0. Vous devez mettre à jour Studio Classic avant d'utiliser une étape de réglage, sinon le DAG du pipeline ne s'affichera pas. Pour mettre à jour Studio Classic, voirArrêter et mettre à jour SageMaker Studio Classic.
L'exemple suivant montre comment créer une définition TuningStep
.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://
my-bucket/my-data
")) )
Obtenir la meilleure version de modèle
L'exemple suivant montre comment obtenir la meilleure version de modèle à partir de la tâche de réglage à l'aide de la méthode get_top_model_s3_uri
. Tout au plus, les 50 versions les plus performantes sont disponibles classées par HyperParameterTuningJobobjectif. L'argument top_k
est un index dans les versions, où top_k=0
est la version la plus performante et top_k=49
est la version la moins performante.
best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )
Pour plus d'informations sur les exigences relatives aux étapes de réglage, consultez le document sagemaker.workflow.steps. TuningStep
Étape AutoML
Utilisez l'API AutoML
Note
Actuellement, l'étape AutoML ne prend en charge que le mode d'entraînement d'assemblage.
L'exemple suivant montre comment créer une définition avec AutoMLStep
.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="
<role>
", target_attribute_name="my_target_attribute_name
", mode="ENSEMBLING
", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://my-bucket/my-training-data
", target_attribute_name="my_target_attribute_name
", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://my-bucket/my-validation-data
", target_attribute_name="my_target_attribute_name
", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )
Obtenir la meilleure version de modèle
L'étape AutoML entraîne automatiquement plusieurs modèles candidats. Vous pouvez obtenir le modèle avec la meilleure métrique objective à partir de la tâche AutoML à l'aide de la méthode get_best_auto_ml_model
et d'un role
IAM pour accéder aux artefacts du modèle comme suit.
best_model = step_automl.get_best_auto_ml_model(
role=<role>
)
Pour plus d'informations, consultez l'étape AutoML
Étape du modèle
Utilisez un ModelStep
pour créer ou enregistrer un SageMaker modèle. Pour plus d'informations sur les ModelStep
exigences, consultez le document sagemaker.workflow.model_step. ModelStep
Création d’un modèle
Vous pouvez utiliser un ModelStep
pour créer un SageMaker modèle. A ModelStep
nécessite des artefacts de modèle et des informations sur le type d' SageMaker instance que vous devez utiliser pour créer le modèle. Pour plus d'informations sur SageMaker les modèles, consultez Train a Model with Amazon SageMaker.
L'exemple suivant montre comment créer une définition ModelStep
.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )
Enregistrement d'un modèle
Vous pouvez utiliser a ModelStep
pour enregistrer un sagemaker.model.Model
ou un dans sagemaker.pipeline.PipelineModel
le registre des SageMaker modèles Amazon. Un PipelineModel
représente un pipeline d'inférence, qui est un modèle composé d'une séquence linéaire de conteneurs qui traitent les demandes d'inférence. Pour savoir comment enregistrer un modèle, veuillez consulter Enregistrer et déployer des modèles avec Model Registry.
L'exemple suivant montre comment créer une ModelStep
qui enregistre un PipelineModel
.
import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(
bucket_name
, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )
CreateModel Étape
Important
Nous vous recommandons Étape du modèle de l'utiliser pour créer des modèles à partir de la version 2.90.0 du SDK Python SageMaker . CreateModelStep
continuera de fonctionner dans les versions précédentes du SDK SageMaker Python, mais n'est plus activement pris en charge.
Vous utilisez une CreateModel
étape pour créer un SageMaker modèle. Pour plus d'informations sur SageMaker les modèles, consultez Train a Model with Amazon SageMaker.
Une étape de création de modèle nécessite des artefacts de modèle et des informations sur le type d' SageMaker instance que vous devez utiliser pour créer le modèle. L'exemple suivant montre comment créer une définition d'étape CreateModel
. Pour plus d'informations sur les exigences relatives aux CreateModel
étapes, consultez le document sagemaker.workflow.steps. CreateModelDocumentation des étapes
from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )
RegisterModel Étape
Important
Nous vous recommandons Étape du modèle de l'utiliser pour enregistrer des modèles à partir de la version 2.90.0 du SDK Python SageMaker . RegisterModel
continuera de fonctionner dans les versions précédentes du SDK SageMaker Python, mais n'est plus activement pris en charge.
Vous utilisez une RegisterModel
étape pour enregistrer un SageMaker.Model.Model ou un sagemaker.pipeline.PipelineModel
représente un pipeline d'inférence, qui est un modèle composé d'une séquence linéaire de conteneurs qui traitent les demandes d'inférence.
Pour savoir comment enregistrer un modèle, veuillez consulter Enregistrer et déployer des modèles avec Model Registry. Pour plus d'informations sur les exigences relatives aux RegisterModel
étapes, consultez le document sagemaker.workflow.step_collections. RegisterModel
L'exemple suivant montre comment créer une étape RegisterModel
qui enregistre un PipelineModel
.
import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(
bucket_name
, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/
', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/
', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel
", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium
", "ml.m5.xlarge
"], transform_instances=["ml.m5.xlarge
"], model_package_group_name='sipgroup
', )
Si le model
n'est pas fourni, l'étape RegisterModel nécessite un estimateur, comme illustré dans l'exemple suivant.
from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="
AbaloneRegisterModel
", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge
"], transform_instances=["ml.m5.xlarge
"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )
Étape de transformation
Pour exécuter des inférences sur un jeu de données entier, vous utilisez une étape de transformation pour une transformation par lots. Pour plus d'informations sur la transformation par lots, veuillez consulter Exécution de transformations par lots avec des pipelines d'inférence.
Une étape de transformation nécessite un transformateur et les données sur lesquelles exécuter la transformation par lots. L'exemple suivant montre comment créer une définition d'étape Transform
. Pour plus d'informations sur les exigences relatives aux Transform
étapes, consultez le document sagemaker.workflow.steps. TransformStep
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://
my-bucket/my-data
"), )
Étape de condition
Vous utilisez une étape de condition pour évaluer la condition des propriétés de l'étape afin d'évaluer quelle action doit être effectuée ensuite dans le pipeline.
Une étape de condition nécessite une liste de conditions, une liste d'étapes à exécuter si la condition a pour valeur true
et une liste des étapes à exécuter si la condition a pour valeur false
. L'exemple suivant montre comment créer une définition ConditionStep
.
Limites
-
SageMaker Pipelines ne prend pas en charge l'utilisation d'étapes conditionnelles imbriquées. Vous ne pouvez pas transmettre une étape de condition comme entrée pour une autre étape de condition.
-
Une étape de condition ne peut pas utiliser des étapes identiques dans les deux branches. Si vous avez besoin de la même fonctionnalité d'étape dans les deux branches, dupliquez l'étape et donnez-lui un nom différent.
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )
Pour plus d'informations sur les ConditionStep
exigences, consultez le document sagemaker.workflow.condition_step. ConditionStep
Étape de rappel
Vous utilisez une Callback
étape pour intégrer dans votre flux de travail des processus et AWS
des services supplémentaires qui ne sont pas directement fournis par Amazon SageMaker Model Building Pipelines. Lorsqu'une étape de Callback
s'exécute, la procédure suivante se produit :
-
SageMaker Pipelines envoie un message à une file d'attente Amazon Simple Queue Service (Amazon SQS) spécifiée par le client. Le message contient un jeton SageMaker généré par Pipelines et une liste de paramètres d'entrée fournie par le client. Après avoir envoyé le message, SageMaker Pipelines attend une réponse du client.
-
Le client récupère le message dans la file d'attente Amazon SQS et démarre son processus personnalisé.
-
Lorsque le processus est terminé, le client appelle l'une des API suivantes et envoie le jeton généré par les SageMaker pipelines :
-
SendPipelineExecutionStepSuccès, accompagné d'une liste de paramètres de sortie
-
SendPipelineExecutionStepDéfaillance, accompagnée d'une raison de défaillance
-
-
L'appel d'API SageMaker oblige les pipelines à poursuivre le processus de pipeline ou à échouer.
Pour plus d'informations sur les exigences relatives aux Callback
étapes, consultez le document sagemaker.workflow.callback_step. CallbackStep
Important
Callback
les étapes ont été introduites dans Amazon SageMaker Python SDK v2.45.0 et Amazon SageMaker Studio Classic v3.6.2. Vous devez mettre à jour Studio Classic avant d'utiliser une Callback
étape, sinon le DAG du pipeline ne s'affichera pas. Pour mettre à jour Studio Classic, voirArrêter et mettre à jour SageMaker Studio Classic.
L'exemple suivant illustre une mise en œuvre de la procédure précédente.
from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
Note
Paramètres de sortie pour CallbackStep
ne doit pas être imbriqué. Par exemple, si vous utilisez un dictionnaire imbriqué comme paramètre de sortie, le dictionnaire est traité comme une chaîne unique (par ex. {"output1": "{\"nested_output1\":\"my-output\"}"}
). Si vous fournissez une valeur imbriquée, lorsque vous essayez de faire référence à un paramètre de sortie particulier, une erreur client non réessayable est renvoyée.
Comportement d'arrêt
Un processus de pipeline ne s'arrête pas lorsqu'une étape de Callback
est en cours d'exécution.
Lorsque vous appelez StopPipelineExecution sur un processus de pipeline comportant une Callback
étape en cours d'exécution, SageMaker Pipelines envoie un message Amazon SQS supplémentaire à la file d'attente SQS spécifiée. Le corps du message SQS contient un champ Status (Statut), qui est défini sur Stopping
. L'exemple suivant montre le corps d'un message SQS.
{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }
Vous devez ajouter une logique à votre client de message Amazon SQS pour effectuer toutes les actions nécessaires (par exemple, le nettoyage des ressources) dès la réception du message, suivi d'un appel à SendPipelineExecutionStepSuccess
ou SendPipelineExecutionStepFailure
.
Ce n'est que lorsque SageMaker Pipelines reçoit l'un de ces appels qu'il arrête le processus de pipeline.
Étape Lambda
Vous utilisez une étape Lambda pour exécuter une AWS Lambda fonction. Vous pouvez exécuter une fonction Lambda existante ou SageMaker créer et exécuter une nouvelle fonction Lambda. Pour un bloc-notes expliquant comment utiliser une étape Lambda dans un pipeline, consultez SageMaker sagemaker-pipelines-lambda-step.ipynb.
Important
Les étapes Lambda ont été introduites dans le SDK Amazon SageMaker Python v2.51.0 et dans Amazon SageMaker Studio Classic v3.9.1. Vous devez mettre à jour Studio Classic avant d'utiliser une étape Lambda, sinon le DAG du pipeline ne s'affichera pas. Pour mettre à jour Studio Classic, voirArrêter et mettre à jour SageMaker Studio Classic.
SageMaker fournit la classe SageMaker.Lambda_Helper.Lambda pour créer, mettre à jour, invoquer et supprimer des fonctions LambdaLambda
porte la signature suivante.
Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )
Le sagemaker.workflow.lambda_step. LambdaSteplambda_func
argument de typeLambda
. Pour appeler une fonction Lambda existante, la seule exigence est de fournir l'Amazon Resource Name (ARN) de la fonction à function_arn
. Si vous ne définissez aucune valeur pour function_arn
, vous devez spécifier handler
et l'un des éléments suivants :
-
zipped_code_dir
– Chemin de la fonction Lambda zippées3_bucket
– Compartiment Amazon S3 oùzipped_code_dir
doit être téléchargé -
script
– Chemin d'accès du fichier script de la fonction Lambda
L'exemple suivant montre comment créer une définition d'étape Lambda
qui appelle une fonction Lambda existante.
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
L'exemple suivant montre comment créer une définition d'étape Lambda
qui appelle une fonction Lambda existante.
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
Entrées et sorties
Si votre fonction Lambda
a des entrées ou des sorties, elles doivent également être définies dans votre étape Lambda
.
Note
Les paramètres d'entrée et de sortie ne doivent pas être imbriqués. Par exemple, si vous utilisez un dictionnaire imbriqué comme paramètre de sortie, le dictionnaire est traité comme une chaîne unique (par ex. {"output1": "{\"nested_output1\":\"my-output\"}"}
). Si vous fournissez une valeur imbriquée et que vous essayez d'y faire référence ultérieurement, une erreur client non réessayable est renvoyée.
Lors de la définition de l'étape Lambda
, inputs
doit être un dictionnaire de paires clé-valeur. Chaque valeur du dictionnaire inputs
doit être de type primitif (chaîne, entier ou flottante). Les objets imbriqués ne sont pas pris en charge. Si elle n'est pas définie, la valeur inputs
est définie par défaut sur None
.
La valeur outputs
doit être une liste de clés. Ces clés font référence à un dictionnaire défini dans la sortie de la fonction Lambda
. Comme inputs
, ces clés doivent être de type primitif et les objets imbriqués ne sont pas pris en charge.
Délai d'expiration et comportement d'arrêt
La classe Lambda
a un argument timeout
qui spécifie la durée maximale d'exécution de la fonction Lambda. La valeur par défaut est de 120 secondes, avec une valeur maximum de 10 minutes. Si la fonction Lambda est en cours d'exécution lorsque le délai d'expiration est atteint, l'étape Lambda échoue. Cependant, la fonction Lambda continue de s'exécuter.
Un processus de pipeline ne peut pas être arrêté pendant qu'une étape Lambda est en cours d'exécution, car la fonction Lambda invoquée par l'étape Lambda ne peut pas être arrêtée. Si vous tentez d'arrêter le processus pendant que la fonction Lambda est en cours d'exécution, le pipeline attend que la fonction Lambda se termine ou que le délai d'expiration soit atteint, selon la première éventualité, puis s'arrête. Si la fonction Lambda se termine, le statut de processus du pipeline est Stopped
. Si le délai d'expiration est atteint, le statut de processus du pipeline est Failed
.
ClarifyCheck Étape
Vous pouvez utiliser l'étape ClarifyCheck
afin d'effectuer des vérifications de dérive de référence par rapport aux références précédentes pour l'analyse de biais et l'explicabilité de modèle. Vous pouvez ensuite générer et enregistrer vos références avec le model.register()
et transmettre la sortie de cette méthode à Étape du modèle en utilisant step_args
. Ces lignes de base pour le contrôle de dérive peuvent être utilisées par Amazon SageMaker Model Monitor pour les points de terminaison de votre modèle, de sorte que vous n'avez pas besoin de faire une suggestion de référence séparément. L'étape ClarifyCheck
peut également extraire des références pour la vérification de dérive à partir du registre des modèles. Cette ClarifyCheck
étape s'appuie sur le conteneur prédéfini Amazon SageMaker Clarify qui fournit une gamme de fonctionnalités de surveillance des modèles, notamment la suggestion de contraintes et la validation des contraintes par rapport à une référence donnée. Pour plus d’informations, consultez Commencez avec un conteneur SageMaker Clarify.
Configuration de l' ClarifyCheck étape
Vous pouvez configurer l'étape ClarifyCheck
pour effectuer l'un des types de vérification suivants chaque fois qu'il est utilisé dans un pipeline.
-
Vérification de biais des données
-
Vérification de biais de modèle
-
Vérification d'explicabilité de modèle
Pour ce faire, définissez le paramètre clarify_check_config
avec l'une des valeurs de type de vérification suivantes :
-
DataBiasCheckConfig
-
ModelBiasCheckConfig
-
ModelExplainabilityCheckConfig
L'ClarifyCheck
étape lance une tâche de traitement qui exécute le conteneur SageMaker prédéfini Clarify et nécessite des configurations dédiées pour la vérification et la tâche de traitement. ClarifyCheckConfig
et CheckJobConfig
sont des fonctions d'assistance pour ces configurations qui correspondent à la façon dont la tâche de traitement SageMaker Clarify calcule pour vérifier le biais du modèle, le biais des données ou l'explicabilité du modèle. Pour plus d’informations, consultez Exécutez des tâches de traitement SageMaker Clarify pour l'analyse des biais et l'explicabilité.
Contrôle des comportements d'étape pour la vérification de dérive
L'étape ClarifyCheck
nécessite les deux indicateurs booléens suivants pour le contrôle de son comportement :
-
skip_check
: ce paramètre indique si la vérification de dérive par rapport à la référence précédente est ignorée ou non. S'il est défini surFalse
, la référence précédente du type de contrôle configuré doit être disponible. -
register_new_baseline
: ce paramètre indique si une référence recalculée est accessible via la propriétéBaselineUsedForDriftCheckConstraints
de l'étape. S'il est défini surFalse
, la référence précédente du type de contrôle configuré doit également être disponible. Vous pouvez y accéder via la propriétéBaselineUsedForDriftCheckConstraints
.
Pour plus d’informations, consultez Calcul de référence, détection de la dérive et cycle de vie avec Amazon SageMaker Model Building Pipelines ClarifyCheck et QualityCheck étapes.
Utilisation des références
Vous pouvez éventuellement spécifier le model_package_group_name
pour localiser la référence existante et l'étape ClarifyCheck
extrait les DriftCheckBaselines
sur le dernier package de modèles approuvé dans le groupe de modèles. Vous pouvez également fournir une référence précédente via le paramètre supplied_baseline_constraints
. Si vous spécifiez le model_package_group_name
et les supplied_baseline_constraints
, l'étape ClarifyCheck
utilise la référence spécifiée par le paramètre supplied_baseline_constraints
.
Pour plus d'informations sur l'utilisation des exigences relatives aux ClarifyCheck
étapes, consultez le document sagemaker.workflow.steps. ClarifyCheckClarifyCheck
Step dans SageMaker Pipelines, consultez sagemaker-pipeline-model-monitor-clarify-steps.ipynb
Exemple Créer une étape ClarifyCheck
pour la vérification du biais de données
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="
ml.c5.xlarge
", volume_size_in_gb=120
, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep
']), label=0, dataset_type="text/csv
", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0
], facet_name=[8
], facet_values_or_threshold=[[0.5
]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep
", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json
", model_package_group_name="MyModelPackageGroup
" )
QualityCheck Étape
Vous pouvez utiliser l'étape QualityCheck
pour effectuer des suggestions de référence et des vérifications de dérive par rapport à une référence précédente pour la qualité des données ou la qualité du modèle dans un pipeline. Vous pouvez ensuite générer et enregistrer vos références avec le model.register()
et transmettre la sortie de cette méthode à Étape du modèle en utilisant step_args
. Model Monitor peut utiliser ces références pour la vérification de dérive pour les points de terminaison de votre modèle, de sorte que vous n'avez pas besoin d'effectuer une suggestion de référence séparément. L'étape QualityCheck
peut également extraire des références pour la vérification de dérive à partir du registre des modèles. Cette QualityCheck
étape tire parti du conteneur prédéfini Amazon SageMaker Model Monitor, qui possède une gamme de fonctionnalités de surveillance des modèles, notamment la suggestion de contraintes, la génération de statistiques et la validation des contraintes par rapport à une base de référence. Pour plus d’informations, consultez Conteneur préfabriqué Amazon SageMaker Model Monitor.
Configuration de l' QualityCheck étape
Vous pouvez configurer l'étape QualityCheck
pour effectuer l'un des types de vérification suivants chaque fois qu'il est utilisé dans un pipeline.
-
Vérification de la qualité des données
-
Vérification de la qualité du modèle
Pour ce faire, définissez le paramètre quality_check_config
avec l'une des valeurs de type de vérification suivantes :
-
DataQualityCheckConfig
-
ModelQualityCheckConfig
L'étape QualityCheck
lance une tâche de traitement qui exécute le conteneur prédéfini Model Monitor et nécessite des configurations dédiées pour la vérification et la tâche de traitement. QualityCheckConfig
et CheckJobConfig
sont des fonctions d'assistance pour ces configurations qui sont alignées sur la façon dont Model Monitor crée une référence pour la surveillance de la qualité du modèle ou de la qualité des données. Pour plus d'informations sur les suggestions de référence Model Monitor, veuillez consulter Création d'une référence et Création d'une référence de qualité du modèle.
Contrôle des comportements d'étape pour la vérification de dérive
L'étape QualityCheck
nécessite les deux indicateurs booléens suivants pour le contrôle de son comportement :
-
skip_check
: ce paramètre indique si la vérification de dérive par rapport à la référence précédente est ignorée ou non. S'il est défini surFalse
, la référence précédente du type de contrôle configuré doit être disponible. -
register_new_baseline
: ce paramètre indique si une référence recalculée est accessible via les propriétésBaselineUsedForDriftCheckConstraints
etBaselineUsedForDriftCheckStatistics
de l'étape. S'il est défini surFalse
, la référence précédente du type de contrôle configuré doit également être disponible. Vous pouvez y accéder via les propriétésBaselineUsedForDriftCheckConstraints
etBaselineUsedForDriftCheckStatistics
.
Pour plus d’informations, consultez Calcul de référence, détection de la dérive et cycle de vie avec Amazon SageMaker Model Building Pipelines ClarifyCheck et QualityCheck étapes.
Utilisation des références
Vous pouvez spécifier une référence précédente directement via les paramètres supplied_baseline_statistics
et supplied_baseline_constraints
, ou vous pouvez simplement spécifier le model_package_group_name
pour que l'étape QualityCheck
extraie les DriftCheckBaselines
sur le dernier package de modèles approuvé dans le groupe de modèles. Lorsque vous spécifiez le model_package_group_name
, les supplied_baseline_constraints
et les supplied_baseline_statistics
, l'étape QualityCheck
utilise la référence spécifiée par les supplied_baseline_constraints
et les supplied_baseline_statistics
sur le type de vérification de l'étape QualityCheck
que êtes en train d'exécuter.
Pour plus d'informations sur l'utilisation des exigences relatives aux QualityCheck
étapes, consultez le document sagemaker.workflow.steps. QualityCheckQualityCheck
Step dans SageMaker Pipelines, consultez sagemaker-pipeline-model-monitor-clarify-steps.ipynb
Exemple Créer une étape QualityCheck
pour la vérification de la qualité des données
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="
ml.c5.xlarge
", volume_size_in_gb=120
, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train
"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json
", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json
", model_package_group_name="MyModelPackageGroup
" )
Étape EMR
Vous pouvez utiliser l'étape EMR Amazon SageMaker Model Building Pipelines pour traiter les étapes Amazon EMR sur un cluster Amazon EMR en cours d'exécution ou demander au pipeline de créer et de gérer un cluster Amazon EMR pour vous. Pour plus d'informations sur Amazon EMR, consultez Bien démarrer avec Amazon EMR.
L'étape EMR nécessite que EMRStepConfig
inclue l'emplacement du fichier JAR que doit utiliser le cluster Amazon EMR et tous les arguments à transmettre. Vous fournissez également l'ID du cluster Amazon EMR si vous souhaitez exécuter l'étape sur un cluster EMR en cours d'exécution, ou la configuration du cluster si vous souhaitez que l'étape EMR s'exécute sur un cluster qu'elle crée, gère et résilie pour vous. Les sections suivantes incluent des exemples et des liens vers des exemples de blocs-notes illustrant les deux méthodes.
Note
-
Les étapes EMR exigent que le rôle transmis à votre pipeline ait des autorisations supplémentaires. Vous devez attacher la politique gérée par AWS :
AmazonSageMakerPipelinesIntegrations
à votre rôle de pipeline, ou vous assurer que le rôle inclut les autorisations de cette politique. -
L'étape EMR n'est pas prise en charge sur EMR sans serveur, ni sur Amazon EMR sur EKS.
-
Si vous traitez une étape EMR sur un cluster en cours d'exécution, vous pouvez uniquement utiliser un cluster dans l'un des états suivants :
STARTING
,BOOTSTRAPPING
,RUNNING
ouWAITING
. -
Si vous traitez les étapes EMR sur un cluster en cours d'exécution, vous pouvez avoir au maximum 256 étapes EMR dans un état
PENDING
sur un cluster EMR. Les étapes EMR soumises au-delà de cette limite entraînent l'échec de l'exécution du pipeline. Vous pouvez envisager d'utiliser Politique de nouvelle tentative pour les étapes du pipeline. Vous pouvez spécifier l'ID ou la configuration du cluster, mais pas les deux.
L'étape EMR repose sur Amazon EventBridge pour surveiller les modifications de l'étape EMR ou de l'état du cluster. Si vous traitez votre tâche Amazon EMR sur un cluster en cours d'exécution, l'étape EMR utilise la règle
SageMakerPipelineExecutionEMRStepStatusUpdateRule
pour surveiller son état. Si vous traitez votre tâche sur un cluster créé pour vous par l'étape EMR, l'étape utilise la règleSageMakerPipelineExecutionEMRClusterStatusRule
pour surveiller les modifications de l'état du cluster. Si l'une de ces EventBridge règles apparaît dans votre AWS compte, ne la supprimez pas, sinon votre étape EMR risque de ne pas être terminée.
Lancement d'une nouvelle tâche sur un cluster Amazon EMR en cours d'exécution
Si vous souhaitez lancer une nouvelle tâche sur un cluster Amazon EMR en cours d'exécution, vous devez transmettre l'ID du cluster sous forme de chaîne à l'argument cluster_id
de EMRStep
. L'exemple suivant illustre cette procédure.
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="
jar-location
", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )
Pour un exemple de bloc-notes qui vous guide à travers un exemple complet, voir SageMaker Pipelines EMR Step With Running EMR Cluster.
Lancement d'une nouvelle tâche sur un nouveau cluster Amazon EMR
Si vous souhaitez lancer une nouvelle tâche sur un nouveau cluster EMRStep
créé pour vous, fournissez la configuration de votre cluster sous forme de dictionnaire avec la même structure qu'une demande RunJobFlow. Toutefois, n'incluez pas les champs suivants dans la configuration de votre cluster :
[
Name
][
Steps
][
AutoTerminationPolicy
][
Instances
][KeepJobFlowAliveWhenNoSteps
][
Instances
][TerminationProtected
]
Tous les autres arguments RunJobFlow
peuvent être utilisés dans votre configuration de cluster. Pour plus de détails sur la syntaxe des demandes, consultez RunJobFlow.
L'exemple suivant transmet une configuration de cluster à une définition d'étape EMR, qui invite l'étape à lancer une nouvelle tâche sur un nouveau cluster EMR. Dans cet exemple, la configuration du cluster EMR inclut des spécifications pour les nœuds primaires et principaux du cluster EMR. Pour plus d'informations sur les types de nœuds Amazon EMR, consultez Comprendre les types de nœuds : nœuds primaires, principaux et de tâches.
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="
jar-location
", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role
", "ServiceRole": "service-role
" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )
Pour un exemple de bloc-notes qui vous guide à travers un exemple complet, voir SageMaker Pipelines EMR Step With Cluster Lifecycle Management
Notebook Job Step
Utilisez a NotebookJobStep
pour exécuter votre SageMaker Notebook Job de manière non interactive en tant qu'étape du pipeline. Pour plus d'informations sur SageMaker Notebook Jobs, consultezSageMaker Emplois sur ordinateur portable.
A NotebookJobStep
nécessite au minimum un bloc-notes d'entrée, une URI d'image et un nom de noyau. Pour plus d'informations sur les exigences relatives aux étapes de Notebook Job et sur les autres paramètres que vous pouvez définir pour personnaliser votre étape, consultez sagemaker.workflow.steps. NotebookJob
L'exemple suivant utilise un minimum d'arguments pour définir unNotebookJobStep
.
from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=
input_notebook
, image_uri=image_uri
, kernel_name=kernel_name
)
Votre étape de NotebookJobStep
pipeline est traitée comme une tâche de SageMaker carnet. Vous pouvez donc suivre l'état d'exécution dans le tableau de bord des tâches de bloc-notes de l'interface utilisateur de Studio Classic si vous incluez des balises spécifiques dans l'tags
argument. Pour plus de détails sur les balises à inclure, consultezConsultez les tâches de votre bloc-notes dans le tableau de bord de l'interface utilisateur de Studio.
De plus, si vous planifiez votre tâche de bloc-notes à l'aide du SDK SageMaker Python, vous ne pouvez spécifier que certaines images pour exécuter votre tâche de bloc-notes. Pour plus d’informations, consultez Contraintes d'image pour les SageMaker tâches de bloc-notes du SDK Python.
Étape de Fail
Vous utilisez a FailStep
pour arrêter l'exécution d'un Amazon SageMaker Model Building Pipelines lorsqu'une condition ou un état souhaité n'est pas atteint et pour marquer l'exécution de ce pipeline comme ayant échoué. L'FailStep
vous permet également de saisir un message d'erreur personnalisé indiquant la cause de l'échec d'exécution du pipeline.
Note
Lorsqu'une FailStep
et les autres étapes du pipeline s'exécutent simultanément, le pipeline ne se termine pas tant que toutes les étapes simultanées ne sont pas terminées.
Limites d'utilisation pour FailStep
-
Vous ne pouvez pas ajouter d'
FailStep
vers la listeDependsOn
des autres étapes. Pour plus d’informations, consultez Dépendance personnalisée entre étapes. -
Les autres étapes ne peuvent pas faire référence à l'
FailStep
. C'est toujours la dernière étape de l'exécution d'un pipeline. -
Vous ne pouvez pas réessayer une exécution de pipeline se terminant par une
FailStep
.
Vous pouvez créer l'ErrorMessage
FailStep
sous la forme d'une chaîne de texte statique. Vous pouvez également utiliser les Paramètres du pipeline, une opération de jointure
L'exemple d'extrait de code suivant utilise une FailStep
avec un ErrorMessage
configuré avec les paramètres du pipeline et une opération de Join
.
from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="
AbaloneMSEFail
", error_message=Join( on=" ", values=["Execution failed due to MSE >
", mse_threshold_param] ), )
Propriétés de l'étape
L'attribut properties
est utilisé pour ajouter des dépendances de données entre les étapes du pipeline. Ces dépendances de données sont ensuite utilisées par les SageMaker pipelines pour construire le DAG à partir de la définition du pipeline. Ces propriétés peuvent être référencées en tant que valeurs d'espace réservé et sont résolues lors de l'exécution.
L'properties
attribut d'une étape SageMaker Pipelines correspond à l'objet renvoyé par un Describe
appel pour le type de SageMaker tâche correspondant. Pour chaque type de tâche, l'appel Describe
renvoie l'objet de réponse suivant :
-
ProcessingStep
— DescribeProcessingJob -
TrainingStep
— DescribeTrainingJob -
TransformStep
— DescribeTransformJob
Pour vérifier quelles propriétés peuvent être référencées pour chaque type d'étape lors de la création de dépendances de données, consultez Data Dependency - Property Reference
Parallélisme d'étapes
Lorsqu'une étape ne dépend d'aucune autre étape, elle est exécutée immédiatement lors de l'exécution du pipeline. Toutefois, l'exécution en parallèle d'un trop grand nombre d'étapes du pipeline peut rapidement épuiser les ressources disponibles. Contrôlez le nombre d'étapes simultanées pour une exécution de pipeline avec ParallelismConfiguration
.
L'exemple suivant utilise ParallelismConfiguration
pour définir la limite des étapes simultanées à cinq.
pipeline.create( parallelism_config=ParallelismConfiguration(5), )
Dépendance des données entre étapes
Vous définissez la structure de votre DAG en spécifiant les relations des données entre les étapes. Pour créer des dépendances de données entre les étapes, transmettez les propriétés d'une étape comme entrée à une autre étape du pipeline. L'étape recevant l'entrée n'est démarrée qu'après l'étape fournissant l'entrée a terminé l'exécution.
Une dépendance de données utilise une JsonPath notation au format suivant. Ce format traverse le fichier de propriétés JSON, ce qui signifie que vous pouvez ajouter autant d'instances <propiété>
que nécessaire pour atteindre la propriété imbriquée souhaitée dans le fichier. Pour plus d'informations sur la JsonPath notation, consultez le JsonPath dépôt.
<step_name>
.properties.<property>
.<property>
Ce qui suit montre comment spécifier un compartiment Amazon S3 à l'aide de la propriété ProcessingOutputConfig
d'une étape de traitement.
step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
Pour créer la dépendance des données, transmettez le compartiment à une étape d'entraînement comme suit.
from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )
Pour vérifier quelles propriétés peuvent être référencées pour chaque type d'étape lors de la création de dépendances de données, consultez Data Dependency - Property Reference
Dépendance personnalisée entre étapes
Lorsque vous spécifiez une dépendance aux données, SageMaker Pipelines fournit la connexion de données entre les étapes. Une étape peut également accéder aux données d'une étape précédente sans utiliser directement les SageMaker pipelines. Dans ce cas, vous pouvez créer une dépendance personnalisée qui indique à SageMaker Pipelines de ne pas démarrer une étape avant la fin de l'exécution d'une autre étape. Vous créez une dépendance personnalisée en spécifiant l'attribut DependsOn
d'une étape.
À titre d'exemple, ce qui suit définit une étape C
qui démarre seulement après que les deux étapes A
et B
terminent leur exécution.
{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }
SageMaker Pipelines émet une exception de validation si la dépendance crée une dépendance cyclique.
L'exemple suivant crée une étape d'entraînement qui démarre après l'exécution d'une étape de traitement.
processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])
L'exemple suivant crée une étape d'entraînement qui ne démarre pas tant que l'exécution de deux étapes de traitement différentes n'est pas terminée.
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])
Ce qui suit fournit un autre moyen de créer la dépendance personnalisée.
training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])
L'exemple suivant crée une étape d'entraînement qui reçoit les entrées d'une étape de traitement et attend que l'exécution d'une autre étape de traitement se termine.
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])
L'exemple suivant montre comment extraire une liste de chaînes des dépendances personnalisées d'une étape.
custom_dependencies = training_step.depends_on
Utiliser une image personnalisée dans une étape
Vous pouvez utiliser n'importe laquelle des images de conteneur SageMaker Deep Learning
Vous pouvez également utiliser votre propre conteneur avec des étapes de pipeline. Comme vous ne pouvez pas créer d'image depuis Amazon SageMaker Studio Classic, vous devez créer votre image à l'aide d'une autre méthode avant de l'utiliser avec Amazon SageMaker Model Building Pipelines.
Pour utiliser votre propre conteneur lors de la création des étapes pour votre pipeline, incluez l'URI de l'image dans la définition de l'estimateur. Pour plus d'informations sur l'utilisation de votre propre conteneur avec SageMaker, consultez la section Utilisation de conteneurs Docker avec SageMaker.