Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Puede crear una canalización convirtiendo las funciones de Python en pasos de canalización mediante el decorador @step
, creando dependencias entre esas funciones para crear un gráfico de canalización (o gráfico acíclico dirigido [DAG]) y pasando los nodos de hoja de ese gráfico como una lista de pasos a la canalización. En las siguientes secciones, se explica este procedimiento en detalle con ejemplos.
Temas
Conversión de una función en un paso
Para crear un paso con el decorador @step
, anote la función con @step
. En el siguiente ejemplo, se muestra una función decorada con @step
que preprocesa los datos.
from sagemaker.workflow.function_step import step
@step
def preprocess(raw_data):
df = pandas.read_csv(raw_data)
...
return procesed_dataframe
step_process_result = preprocess(raw_data)
Cuando invocas una función @step
decorada con símbolos, la SageMaker IA devuelve una DelayedReturn
instancia en lugar de ejecutar la función. Una instancia de DelayedReturn
es un proxy de la devolución real de esa función. La instancia de DelayedReturn
se puede pasar a otra función como argumento o directamente a una instancia de canalización como paso. Para obtener información sobre la DelayedReturn
clase, consulta sagemaker.workflow.function_step. DelayedReturn
Creación de dependencias entre los pasos
Al crear una dependencia entre dos pasos, se crea una conexión entre los pasos del gráfico de canalización. En las siguientes secciones, se presentan varias formas de crear una dependencia entre los pasos de la canalización.
Dependencias de datos a través de argumentos de entrada
Al pasar la salida DelayedReturn
de una función como entrada a otra función, se crea automáticamente una dependencia de datos en el DAG de la canalización. En el siguiente ejemplo, al pasar la salida DelayedReturn
de la función preprocess
a la función train
, se crea una dependencia entre preprocess
y train
.
from sagemaker.workflow.function_step import step
@step
def preprocess(raw_data):
df = pandas.read_csv(raw_data)
...
return procesed_dataframe
@step
def train(training_data):
...
return trained_model
step_process_result = preprocess(raw_data)
step_train_result = train(step_process_result)
El ejemplo anterior define una función de entrenamiento que está decorada con @step
. Cuando se invoca esta función, recibe como entrada la salida DelayedReturn
del paso de la canalización de preprocesamiento. Al invocar la función de entrenamiento, se devuelve otra instancia de DelayedReturn
. Esta instancia contiene la información sobre todos los pasos anteriores definidos en esa función (es decir, el paso preprocess
de este ejemplo) que forman el DAG de la canalización.
En el ejemplo anterior, la función preprocess
devuelve un valor único. Para ver tipos de devoluciones más complejas, como listas o tuplas, consulte Limitaciones.
Definición de dependencias personalizadas
En el ejemplo anterior, la función train
recibió la salida DelayedReturn
de preprocess
y creó una dependencia. Si desea definir la dependencia de forma explícita sin pasar la salida del paso anterior, utilice la función add_depends_on
con el paso. Puede usar la función get_step()
para recuperar el paso subyacente de su instancia de DelayedReturn
y, a continuación, llamar a add_depends_on
_on con la dependencia como entrada. Para ver la definición de la función get_step()
, consulte sagemaker.workflow.step_outputs.get_steppreprocess
y train
usando get_step()
y add_depends_on()
.
from sagemaker.workflow.step_outputs import get_step
@step
def preprocess(raw_data):
df = pandas.read_csv(raw_data)
...
processed_data = ..
return s3.upload(processed_data)
@step
def train():
training_data = s3.download(....)
...
return trained_model
step_process_result = preprocess(raw_data)
step_train_result = train()
get_step(step_train_result).add_depends_on([step_process_result])
Transferencia de datos desde y hacia una función decorada con @step
para un paso de canalización tradicional
Puede crear una canalización que incluya un paso decorado con @step
y un paso de canalización tradicional y pasar datos entre ellos. Por ejemplo, puede utilizar ProcessingStep
para procesar los datos y pasar su resultado a la función de entrenamiento decorada con @step
. En el siguiente ejemplo, un paso de entrenamiento decorado con @step
hace referencia al resultado de un paso de procesamiento.
# Define processing step
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
sklearn_processor = SKLearnProcessor(
framework_version='1.2-1',
role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
instance_type='ml.m5.large',
instance_count='1',
)
inputs = [
ProcessingInput(source=input_data
, destination="/opt/ml/processing/input"),
]
outputs = [
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]
process_step = ProcessingStep(
name="MyProcessStep",
step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
# Define a @step
-decorated train step which references the
# output of a processing step
@step
def train(train_data_path, test_data_path):
...
return trained_model
step_train_result = train(
process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
Uso de ConditionStep
con pasos decorados con @step
Canalizaciones admite una clase ConditionStep
que evalúa los resultados de los pasos anteriores para decidir qué medidas tomar en la canalización. También puede utilizar ConditionStep
con un paso decorado con @step
. Para usar la salida de cualquier paso decorado con @step
con ConditionStep
, introduzca la salida de ese paso como argumento para ConditionStep
. En el siguiente ejemplo, el paso de condición recibe el resultado del paso de evaluación del modelo decorado con @step
.
# Define steps
@step(name="evaluate")
def evaluate_model():
# code to evaluate the model
return {
"rmse":rmse_value
}
@step(name="register")
def register_model():
# code to register the model
...
# Define ConditionStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep
conditionally_register = ConditionStep(
name="conditional_register",
conditions=[
ConditionGreaterThanOrEqualTo(
# Output of the evaluate step must be json serializable
left=evaluate_model()["rmse"], #
right=5,
)
],
if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
else_steps=[register_model()],
)
Definición de una canalización utilizando la salida DelayedReturn
de los pasos
La canalización se define de la misma manera independientemente de si se utiliza o no un decorador @step
. Cuando pasa una instancia de DelayedReturn
a su canalización, no es necesario pasar una lista completa de pasos para crear la canalización. El SDK deduce automáticamente los pasos anteriores en función de las dependencias que defina. Todos los pasos anteriores de los objetos Step
que ha pasado a la canalización o a los objetos DelayedReturn
se incluyen en el gráfico de la canalización. En el siguiente ejemplo, la canalización recibe el objeto DelayedReturn
de la función train
. SageMaker La IA añade el preprocess
paso, como paso anteriortrain
, al gráfico de canalización.
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name="<pipeline-name>
",
steps=[step_train_result],
sagemaker_session=<sagemaker-session>
,
)
Si no hay datos o dependencias personalizadas entre los pasos y ejecuta varios pasos en paralelo, el gráfico de canalización tiene más de un nodo hoja. Pase todos estos nodos de hoja en una lista al argumento steps
de la definición de la canalización, tal y como se muestra en el siguiente ejemplo:
@step
def process1():
...
return data
@step
def process2():
...
return data
step_process1_result = process1()
step_process2_result = process2()
pipeline = Pipeline(
name="<pipeline-name>
",
steps=[step_process1_result, step_process2_result],
sagemaker_session=sagemaker-session
,
)
Cuando se ejecuta la canalización, ambos pasos se ejecutan en paralelo.
Solo se pasan los nodos de hoja del gráfico a la canalización, ya que los nodos de hoja contienen información sobre todos los pasos anteriores definidos mediante dependencias de datos o personalizadas. Al compilar la canalización, la SageMaker IA también deduce todos los pasos subsiguientes que forman el gráfico de la canalización y añade cada uno de ellos como un paso independiente a la canalización.
Creación de una canalización
Cree una canalización mediante una llamada a pipeline.create()
, tal y como se muestra en el siguiente fragmento. Para obtener más información sobre create()
, consulte sagemaker.workflow.pipeline.Pipeline.create
role = "pipeline-role
"
pipeline.create(role)
Cuando llamaspipeline.create()
, la SageMaker IA compila todos los pasos definidos como parte de la instancia de canalización. SageMaker La IA carga la función serializada, los argumentos y todos los demás artefactos relacionados con los pasos en Amazon S3.
Los datos residen en el bucket de S3 de acuerdo con la siguiente estructura:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
se define en el archivo de configuración de SageMaker AI y se aplica a toda la canalización. Si no está definido, se utiliza el depósito de SageMaker IA predeterminado.
nota
Cada vez que la SageMaker IA compila una canalización, SageMaker guarda las funciones, los argumentos y las dependencias serializados de los pasos en una carpeta con la fecha y hora actuales. Esto ocurre cada vez que ejecuta pipeline.create()
, pipeline.update()
, pipeline.upsert()
o pipeline.definition()
.