As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Crie um pipeline com funções @step
decoradas
Você pode criar um pipeline convertendo funções do Python em etapas do pipeline usando @step
o decorador, criando dependências entre essas funções para criar um gráfico do pipeline (ou gráfico acíclico direcionado DAG ()) e passando os nós da folha desse gráfico como uma lista de etapas para o pipeline. As seções a seguir explicam esse procedimento detalhadamente com exemplos.
Tópicos
Converter uma função em uma etapa
Para criar uma etapa usando o @step
decorador, anote a função com. @step
O exemplo a seguir mostra uma função @step
-decorada que pré-processa os dados.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Quando você invoca uma função @step
-decorada, SageMaker retorna uma DelayedReturn
instância em vez de executar a função. Uma DelayedReturn
instância é um proxy para o retorno real dessa função. A DelayedReturn
instância pode ser passada para outra função como argumento ou diretamente para uma instância do pipeline como uma etapa. Para obter informações sobre a DelayedReturn
classe, consulte sagemaker.workflow.function_step. DelayedReturn
Crie dependências entre as etapas
Ao criar uma dependência entre duas etapas, você cria uma conexão entre as etapas no gráfico do pipeline. As seções a seguir apresentam várias maneiras de criar uma dependência entre as etapas do pipeline.
Dependências de dados por meio de argumentos de entrada
Passar a DelayedReturn
saída de uma função como entrada para outra função cria automaticamente uma dependência de dados no pipelineDAG. No exemplo a seguir, passar a DelayedReturn
saída da preprocess
função para a train
função cria uma dependência entre preprocess
e. train
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
O exemplo anterior define uma função de treinamento que é decorada com@step
. Quando essa função é invocada, ela recebe a DelayedReturn
saída da etapa do pipeline de pré-processamento como entrada. A invocação da função de treinamento retorna outra DelayedReturn
instância. Essa instância contém as informações sobre todas as etapas anteriores definidas nessa função (ou seja, a preprocess
etapa neste exemplo) que formam o pipelineDAG.
No exemplo anterior, a preprocess
função retorna um único valor. Para tipos de retorno mais complexos, como listas ou tuplas, consulte. Limitações
Defina dependências personalizadas
No exemplo anterior, a train
função recebeu a DelayedReturn
saída de preprocess
e criou uma dependência. Se você quiser definir a dependência explicitamente sem passar a saída da etapa anterior, use a add_depends_on
função com a etapa. Você pode usar a get_step()
função para recuperar a etapa subjacente de sua DelayedReturn
instância e, em seguida, chamar add_depends_on
_on com a dependência como entrada. Para ver a definição da get_step()
função, consulte sagemaker.workflow.step_outputs.get_steppreprocess
get_step()
e train
usando e. add_depends_on()
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Transmita dados de e para uma função @step
-decorada para uma etapa tradicional do pipeline
Você pode criar um pipeline que inclua uma etapa @step
decorada e uma etapa tradicional do pipeline e transmita dados entre elas. Por exemplo, você pode usar ProcessingStep
para processar os dados e passar o resultado para a função de treinamento @step
-decorada. No exemplo a seguir, uma etapa @step
de treinamento decorada faz referência à saída de uma etapa de processamento.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
Use ConditionStep
com @step
degraus decorados
SageMaker Os pipelines oferecem suporte a uma ConditionStep
classe que avalia os resultados das etapas anteriores para decidir qual ação tomar no pipeline. Você também pode usar ConditionStep
com um @step
degrau decorado. Para usar a saída de qualquer etapa @step
decorada comConditionStep
, insira a saída dessa etapa como argumento paraConditionStep
. No exemplo a seguir, a etapa de condição recebe a saída da etapa de avaliação do modelo @step
-decorado.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Defina um pipeline usando a DelayedReturn
saída das etapas
Você define um pipeline da mesma forma, independentemente de usar ou não um @step
decorador. Ao passar uma DelayedReturn
instância para seu pipeline, você não precisa passar uma lista completa de etapas para criar o pipeline. O infere SDK automaticamente as etapas anteriores com base nas dependências que você define. Todas as etapas anteriores dos Step
objetos que você passou para o pipeline ou DelayedReturn
objetos estão incluídas no gráfico do pipeline. No exemplo a seguir, o pipeline recebe o DelayedReturn
objeto da train
função. SageMaker adiciona a preprocess
etapa, como uma etapa anterior dotrain
, ao gráfico do pipeline.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
Se não houver dados ou dependências personalizadas entre as etapas e você executar várias etapas em paralelo, o gráfico do pipeline terá mais de um nó foliar. Passe todos esses nós de folha em uma lista para o steps
argumento em sua definição de pipeline, conforme mostrado no exemplo a seguir:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
Quando a tubulação é executada, as duas etapas são executadas paralelamente.
Você só passa os nós da folha do gráfico para o pipeline porque os nós da folha contêm informações sobre todas as etapas anteriores definidas por meio de dados ou dependências personalizadas. Ao compilar o pipeline, SageMaker também infere todas as etapas subsequentes que formam o gráfico do pipeline e adiciona cada uma delas como uma etapa separada ao pipeline.
Criar um pipeline
Crie um pipeline chamandopipeline.create()
, conforme mostrado no trecho a seguir. Para obter detalhescreate()
, consulte SageMaker.Workflow.Pipeline.Pipeline.create
role = "
pipeline-role
" pipeline.create(role)
Quando você chamapipeline.create()
, SageMaker compila todas as etapas definidas como parte da instância do pipeline. SageMaker carrega a função serializada, os argumentos e todos os outros artefatos relacionados à etapa para o Amazon S3.
Os dados residem no bucket do S3 de acordo com a seguinte estrutura:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
é definido no arquivo de SageMaker configuração e se aplica a todo o pipeline. Se indefinido, o SageMaker bucket padrão será usado.
nota
Sempre que SageMaker compila um pipeline, SageMaker salva as funções, argumentos e dependências serializados das etapas em uma pasta com a data e hora atual. Isso ocorre toda vez que você correpipeline.create()
,pipeline.update()
, pipeline.upsert()
oupipeline.definition()
.