Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Creación de una canalización con funciones decoradas con @step

Modo de enfoque
Creación de una canalización con funciones decoradas con @step - Amazon SageMaker AI

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Puede crear una canalización convirtiendo las funciones de Python en pasos de canalización mediante el decorador @step, creando dependencias entre esas funciones para crear un gráfico de canalización (o gráfico acíclico dirigido [DAG]) y pasando los nodos de hoja de ese gráfico como una lista de pasos a la canalización. En las siguientes secciones, se explica este procedimiento en detalle con ejemplos.

Conversión de una función en un paso

Para crear un paso con el decorador @step, anote la función con @step. En el siguiente ejemplo, se muestra una función decorada con @step que preprocesa los datos.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

Cuando invocas una función @step decorada con símbolos, la SageMaker IA devuelve una DelayedReturn instancia en lugar de ejecutar la función. Una instancia de DelayedReturn es un proxy de la devolución real de esa función. La instancia de DelayedReturn se puede pasar a otra función como argumento o directamente a una instancia de canalización como paso. Para obtener información sobre la DelayedReturn clase, consulta sagemaker.workflow.function_step. DelayedReturn.

Al crear una dependencia entre dos pasos, se crea una conexión entre los pasos del gráfico de canalización. En las siguientes secciones, se presentan varias formas de crear una dependencia entre los pasos de la canalización.

Al pasar la salida DelayedReturn de una función como entrada a otra función, se crea automáticamente una dependencia de datos en el DAG de la canalización. En el siguiente ejemplo, al pasar la salida DelayedReturn de la función preprocess a la función train, se crea una dependencia entre preprocess y train.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

El ejemplo anterior define una función de entrenamiento que está decorada con @step. Cuando se invoca esta función, recibe como entrada la salida DelayedReturn del paso de la canalización de preprocesamiento. Al invocar la función de entrenamiento, se devuelve otra instancia de DelayedReturn. Esta instancia contiene la información sobre todos los pasos anteriores definidos en esa función (es decir, el paso preprocess de este ejemplo) que forman el DAG de la canalización.

En el ejemplo anterior, la función preprocess devuelve un valor único. Para ver tipos de devoluciones más complejas, como listas o tuplas, consulte Limitaciones.

En el ejemplo anterior, la función train recibió la salida DelayedReturn de preprocess y creó una dependencia. Si desea definir la dependencia de forma explícita sin pasar la salida del paso anterior, utilice la función add_depends_on con el paso. Puede usar la función get_step() para recuperar el paso subyacente de su instancia de DelayedReturn y, a continuación, llamar a add_depends_on_on con la dependencia como entrada. Para ver la definición de la función get_step(), consulte sagemaker.workflow.step_outputs.get_step. El siguiente ejemplo muestra cómo crear una dependencia entre preprocess y train usando get_step() y add_depends_on().

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

Puede crear una canalización que incluya un paso decorado con @step y un paso de canalización tradicional y pasar datos entre ellos. Por ejemplo, puede utilizar ProcessingStep para procesar los datos y pasar su resultado a la función de entrenamiento decorada con @step. En el siguiente ejemplo, un paso de entrenamiento decorado con @step hace referencia al resultado de un paso de procesamiento.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

Uso de ConditionStep con pasos decorados con @step

Canalizaciones admite una clase ConditionStep que evalúa los resultados de los pasos anteriores para decidir qué medidas tomar en la canalización. También puede utilizar ConditionStep con un paso decorado con @step. Para usar la salida de cualquier paso decorado con @step con ConditionStep, introduzca la salida de ese paso como argumento para ConditionStep. En el siguiente ejemplo, el paso de condición recibe el resultado del paso de evaluación del modelo decorado con @step.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

Definición de una canalización utilizando la salida DelayedReturn de los pasos

La canalización se define de la misma manera independientemente de si se utiliza o no un decorador @step. Cuando pasa una instancia de DelayedReturn a su canalización, no es necesario pasar una lista completa de pasos para crear la canalización. El SDK deduce automáticamente los pasos anteriores en función de las dependencias que defina. Todos los pasos anteriores de los objetos Step que ha pasado a la canalización o a los objetos DelayedReturn se incluyen en el gráfico de la canalización. En el siguiente ejemplo, la canalización recibe el objeto DelayedReturn de la función train. SageMaker La IA añade el preprocess paso, como paso anteriortrain, al gráfico de canalización.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

Si no hay datos o dependencias personalizadas entre los pasos y ejecuta varios pasos en paralelo, el gráfico de canalización tiene más de un nodo hoja. Pase todos estos nodos de hoja en una lista al argumento steps de la definición de la canalización, tal y como se muestra en el siguiente ejemplo:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

Cuando se ejecuta la canalización, ambos pasos se ejecutan en paralelo.

Solo se pasan los nodos de hoja del gráfico a la canalización, ya que los nodos de hoja contienen información sobre todos los pasos anteriores definidos mediante dependencias de datos o personalizadas. Al compilar la canalización, la SageMaker IA también deduce todos los pasos subsiguientes que forman el gráfico de la canalización y añade cada uno de ellos como un paso independiente a la canalización.

Creación de una canalización

Cree una canalización mediante una llamada a pipeline.create(), tal y como se muestra en el siguiente fragmento. Para obtener más información sobre create(), consulte sagemaker.workflow.pipeline.Pipeline.create.

role = "pipeline-role" pipeline.create(role)

Cuando llamaspipeline.create(), la SageMaker IA compila todos los pasos definidos como parte de la instancia de canalización. SageMaker La IA carga la función serializada, los argumentos y todos los demás artefactos relacionados con los pasos en Amazon S3.

Los datos residen en el bucket de S3 de acuerdo con la siguiente estructura:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_urise define en el archivo de configuración de SageMaker AI y se aplica a toda la canalización. Si no está definido, se utiliza el depósito de SageMaker IA predeterminado.

nota

Cada vez que la SageMaker IA compila una canalización, SageMaker guarda las funciones, los argumentos y las dependencias serializados de los pasos en una carpeta con la fecha y hora actuales. Esto ocurre cada vez que ejecuta pipeline.create(), pipeline.update(), pipeline.upsert() o pipeline.definition().

PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.