As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Criar um pipeline com funções decoradas por @step
Você pode criar um pipeline convertendo funções do Python em etapas de pipeline usando o decorador @step
, criando dependências entre essas funções para criar um gráfico de pipeline (ou gráfico acíclico direcionado (DAG)) e passando os nós da folha desse gráfico como uma lista de etapas para o pipeline. As seções a seguir explicam esse procedimento detalhadamente com exemplos.
Tópicos
Converter uma função em uma etapa
Para criar uma etapa usando o decorador @step
, anote a função com @step
. O exemplo a seguir mostra uma função decorada por @step
que pré-processa os dados.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Quando você invoca uma função @step
-decorada, a SageMaker IA retorna uma DelayedReturn
instância em vez de executar a função. Uma instância DelayedReturn
é um proxy para o retorno real da função. A instância DelayedReturn
pode ser passada para outra função como argumento ou diretamente para uma instância do pipeline como uma etapa. Para obter informações sobre a DelayedReturn
classe, consulte sagemaker.workflow.function_step. DelayedReturn
Crie dependências entre as etapas
Ao criar uma dependência entre duas etapas, você cria uma conexão entre as etapas no gráfico do pipeline. As seções a seguir apresentam várias maneiras de criar uma dependência entre as etapas do pipeline.
Dependências de dados por meio de argumentos de entrada
Passar a saída DelayedReturn
de uma função como entrada para outra função cria automaticamente uma dependência de dados no DAG do pipeline. No exemplo a seguir, passar a saída DelayedReturn
da função preprocess
para a função train
cria uma dependência entre preprocess
e train
.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
O exemplo anterior define uma função de treinamento que é decorada com@step
. Quando essa função é invocada, ela recebe a saída DelayedReturn
da etapa do pipeline de pré-processamento como entrada. A invocação da função de treinamento retorna outra instância DelayedReturn
. Essa instância contém as informações sobre todas as etapas anteriores definidas nessa função (ou seja, a etapa preprocess
no exemplo) que formam o DAG do pipeline.
No exemplo anterior, a função preprocess
retorna um único valor. Para tipos de retorno mais complexos, como listas ou tuplas, consulte Limitações.
Definir dependências personalizadas
No exemplo anterior, a função train
recebeu a saída DelayedReturn
de preprocess
e criou uma dependência. Se você quiser definir a dependência explicitamente sem passar a saída da etapa anterior, use a função add_depends_on
com a etapa. Você pode usar a função get_step()
para recuperar a etapa subjacente de sua instância DelayedReturn
e, em seguida, chamar add_depends_on
_on com a dependência como entrada. Para ver a definição da função get_step()
, consulte sagemaker.workflow.step_outputs.get_steppreprocess
e train
usando get_step()
e add_depends_on()
.
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Transmita dados de e para uma função decorada por @step
para uma etapa tradicional de pipeline
Você pode criar um pipeline que inclua uma etapa decorada por @step
e uma etapa tradicional de pipeline para transmitir dados entre elas. Por exemplo, você pode usar ProcessingStep
para processar os dados e passar o resultado para a função de treinamento decorada por @step
. No exemplo a seguir, uma etapa de treinamento decorada por @step
faz referência à saída de uma etapa de processamento.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
Use ConditionStep
com etapas decoradas por @step
Os pipelines oferecem apoio a uma classe ConditionStep
que avalia os resultados das etapas anteriores para decidir qual ação tomar no pipeline. Você também pode usar ConditionStep
com uma etapa decorada por @step
. Para usar a saída de qualquer etapa decorada por @step
com ConditionStep
, insira a saída dessa etapa como argumento para ConditionStep
. No exemplo a seguir, a etapa de condição recebe a saída da etapa de avaliação de modelo decorada por @step
.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Definir um pipeline usando a saída DelayedReturn
das etapas
Você define um pipeline da mesma forma, independentemente de usar ou não um decorador @step
. Ao passar uma instância DelayedReturn
para seu pipeline, você não precisa passar uma lista completa de etapas para criar o pipeline. O SDK infere automaticamente as etapas anteriores com base nas dependências que você define. Todas as etapas anteriores dos objetos Step
que você passou para o pipeline ou objetos DelayedReturn
são incluídas no gráfico do pipeline. No exemplo a seguir, o pipeline recebe o objeto DelayedReturn
da função train
. SageMaker A IA adiciona a preprocess
etapa, como etapa anteriortrain
, ao gráfico do pipeline.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
Se não houver dados ou dependências personalizadas entre as etapas e você executar várias etapas em paralelo, o gráfico do pipeline terá mais de um nó de folha. Passe todos esses nós de folha em uma lista para o argumento steps
em sua definição de pipeline, conforme mostrado no seguinte exemplo:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
Quando o pipeline é executado, as duas etapas são executadas paralelamente.
Você só passa os nós de folha do gráfico para o pipeline porque os nós de folha contêm informações sobre todas as etapas anteriores definidas por meio de dados ou dependências personalizadas. Ao compilar o pipeline, a SageMaker IA também infere todas as etapas subsequentes que formam o gráfico do pipeline e adiciona cada uma delas como uma etapa separada ao pipeline.
Criar um pipeline
Crie um pipeline chamando pipeline.create()
, como mostrado no trecho a seguir. Para obter detalhes sobre create()
, consulte sagemaker.workflow.pipeline.Pipeline.create
role = "
pipeline-role
" pipeline.create(role)
Quando você ligapipeline.create()
, a SageMaker IA compila todas as etapas definidas como parte da instância do pipeline. SageMaker A IA carrega a função serializada, os argumentos e todos os outros artefatos relacionados à etapa para o Amazon S3.
Os dados residem no bucket do S3 de acordo com a seguinte estrutura:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
é definido no arquivo de configuração do SageMaker AI e se aplica a todo o pipeline. Se indefinido, o bucket de SageMaker IA padrão será usado.
nota
Toda vez que a SageMaker IA compila um pipeline, a SageMaker IA salva as funções, argumentos e dependências serializados das etapas em uma pasta com a data e hora atual. Isso ocorre toda vez que você executa pipeline.create()
, pipeline.update()
, pipeline.upsert()
ou pipeline.definition()
.