Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Crea una canalización con funciones decoradas @step
Puedes crear una canalización convirtiendo las funciones de Python en pasos de canalización mediante el @step
decorador, creando dependencias entre esas funciones para crear un gráfico de canalización (o gráfico acíclico dirigido (DAG)) y pasando los nodos de hoja de esa gráfica como una lista de pasos a la canalización. En las siguientes secciones se explica este procedimiento en detalle con ejemplos.
Temas
Convierte una función en un paso
Para crear un paso con el @step
decorador, anota la función con. @step
En el siguiente ejemplo, se muestra una función @step
decorada con símbolos que preprocesa los datos.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Al invocar una función @step
decorada con símbolos, SageMaker devuelve una DelayedReturn
instancia en lugar de ejecutar la función. Una DelayedReturn
instancia es un proxy del retorno real de esa función. La DelayedReturn
instancia se puede pasar a otra función como argumento o directamente a una instancia de canalización como paso. Para obtener información sobre la DelayedReturn
clase, consulta sagemaker.workflow.function_step. DelayedReturn
Crea dependencias entre los pasos
Al crear una dependencia entre dos pasos, se crea una conexión entre los pasos del gráfico de canalización. En las siguientes secciones, se presentan varias formas de crear una dependencia entre los pasos de la canalización.
Dependencias de datos mediante argumentos de entrada
Al pasar la DelayedReturn
salida de una función como entrada a otra función, se crea automáticamente una dependencia de datos en la canalizaciónDAG. En el siguiente ejemplo, al pasar la DelayedReturn
salida de la preprocess
función a la train
función se crea una dependencia entre preprocess
ytrain
.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
El ejemplo anterior define una función de entrenamiento que está decorada con@step
. Cuando se invoca esta función, recibe como entrada el DelayedReturn
resultado del paso de la canalización de preprocesamiento. Al invocar la función de entrenamiento, se devuelve otra DelayedReturn
instancia. Esta instancia contiene la información sobre todos los pasos anteriores definidos en esa función (es decir, el preprocess
paso de este ejemplo) que forman la canalizaciónDAG.
En el ejemplo anterior, la preprocess
función devuelve un único valor. Para obtener tipos de retorno más complejos, como listas o tuplas, consulte. Limitaciones
Defina las dependencias personalizadas
En el ejemplo anterior, la train
función recibió el DelayedReturn
resultado de preprocess
y creó una dependencia. Si desea definir la dependencia de forma explícita sin pasar el resultado del paso anterior, utilice la add_depends_on
función con el paso. Puedes usar la get_step()
función para recuperar el paso subyacente de su DelayedReturn
instancia y, a continuación, llamar a add_depends_on
_on con la dependencia como entrada. Para ver la definición de la get_step()
función, consulta sagemaker.workflow.step_outputs.get_steppreprocess
train
get_step()
add_depends_on()
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Transfiere datos desde y hacia una función @step
decorada con símbolos a un paso de canalización tradicional
Puede crear una canalización que incluya un @step
escalón decorado y un escalón de canalización tradicional y pasar datos entre ellos. Por ejemplo, puede utilizarlos ProcessingStep
para procesar los datos y pasar su resultado a la función @step
de entrenamiento decorada con letras. En el siguiente ejemplo, un paso @step
de entrenamiento decorado hace referencia al resultado de un paso de procesamiento.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
ConditionStep
Utilízalo con peldaños @step
decorados
Pipelines apoya a una ConditionStep
clase que evalúa los resultados de los pasos anteriores para decidir qué medidas tomar en el proceso. También se puede utilizar ConditionStep
con un @step
escalón decorado. Para usar el resultado de cualquier paso @step
decorado con un símboloConditionStep
, introduce el resultado de ese paso como argumento para. ConditionStep
En el siguiente ejemplo, el paso de condición recibe el resultado del paso de evaluación del modelo @step
decorado con bordes.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Defina una canalización utilizando el DelayedReturn
resultado de los pasos
Una canalización se define de la misma manera independientemente de si se utiliza o no un @step
decorador. Cuando pasas una DelayedReturn
instancia a tu canalización, no necesitas pasar una lista completa de pasos para crear la canalización. Deduce SDK automáticamente los pasos anteriores en función de las dependencias que definas. Todos los pasos anteriores de los Step
objetos que has pasado a la canalización o a los DelayedReturn
objetos se incluyen en el gráfico de canalización. En el siguiente ejemplo, la canalización recibe el DelayedReturn
objeto de la train
función. SageMaker añade el preprocess
paso, como paso anteriortrain
, al gráfico de canalización.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
Si no hay datos o dependencias personalizadas entre los pasos y ejecutas varios pasos en paralelo, el gráfico de canalización tiene más de un nodo hoja. Transfiere todos estos nodos de hoja de una lista al steps
argumento de la definición de tu canalización, como se muestra en el siguiente ejemplo:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
Cuando se ejecuta la canalización, ambos pasos se ejecutan en paralelo.
Solo se pasan los nodos de hoja del gráfico a la canalización, ya que los nodos de hoja contienen información sobre todos los pasos anteriores definidos mediante datos o dependencias personalizadas. Al compilar la canalización, SageMaker también deduce todos los pasos subsiguientes que forman el gráfico de canalización y añade cada uno de ellos como un paso independiente a la canalización.
Creación de una canalización
Crea una canalización mediante una llamadapipeline.create()
, tal y como se muestra en el siguiente fragmento. Para obtener más información al respectocreate()
, consulte SageMaker.workflow.Pipeline.Pipeline.create.
role = "
pipeline-role
" pipeline.create(role)
Cuando llamas, compila todos los pasos definidos como parte de la instancia de pipeline.create()
SageMaker canalización. SageMaker carga la función serializada, los argumentos y todos los demás artefactos relacionados con los pasos en Amazon S3.
Los datos residen en el depósito de S3 de acuerdo con la siguiente estructura:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
se define en el archivo de SageMaker configuración y se aplica a toda la canalización. Si no está definido, se utiliza el SageMaker depósito predeterminado.
nota
Cada vez SageMaker que compila una canalización, SageMaker guarda las funciones, los argumentos y las dependencias serializados de los pasos en una carpeta con la hora actual. Esto ocurre cada vez que ejecutas, o. pipeline.create()
pipeline.update()
pipeline.upsert()
pipeline.definition()