Invocar uma função remota - Amazon SageMaker

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Invocar uma função remota

Para invocar uma função dentro do decorador @remote, use um dos seguintes métodos:

Se você usar o método decorador @remote para invocar uma função, o trabalho de treinamento aguardará a conclusão da função antes de iniciar uma nova tarefa. No entanto, se você usar o RemoteExecutorAPI, poderá executar mais de um trabalho em paralelo. As seções a seguir mostram as duas formas de invocar uma função.

Use um decorador @remote para invocar uma função

Você pode usar o decorador @remote para anotar uma função. SageMaker transformará o código dentro do decorador em um trabalho SageMaker de treinamento. O trabalho de treinamento então invocará a função dentro do decorador e aguardará a conclusão do trabalho. O exemplo de código a seguir mostra como importar as bibliotecas necessárias, iniciar uma SageMaker instância e anotar uma multiplicação de matrizes com o decorador @remote.

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

O decorador é definido da seguinte forma.

def remote( *, **kwarg): ...

Quando você invoca uma função decorada, o SageMaker SDK Python carrega todas as exceções geradas por um erro na memória local. No exemplo de código a seguir, a primeira chamada para a função de divisão é concluída com êxito e o resultado é carregado na memória local. Na segunda chamada para a função de divisão, o código retorna um erro e esse erro é carregado na memória local.

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
nota

A função decorada é executada como um trabalho remoto. Se o encadeamento for interrompido, o trabalho subjacente não será interrompido.

Como alterar o valor de uma variável local

A função decoradora é executada em uma máquina remota. Alterar uma variável não local ou argumentos de entrada dentro de uma função decorada não alterará o valor local.

No exemplo de código a seguir, uma lista e um dicionário são anexados à função decoradora. Isso não muda quando a função decoradora é invocada.

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

Para alterar o valor de uma variável local declarada dentro de uma função decoradora, retorne a variável da função. O exemplo de código a seguir mostra que o valor de uma variável local é alterado quando ela é retornada da função.

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

Serialização e desserialização de dados

Quando você invoca uma função remota, serializa SageMaker automaticamente os argumentos da função durante os estágios de entrada e saída. Os argumentos e retornos da função são serializados usando o cloudpickle. SageMaker suporta a serialização dos seguintes objetos e funções do Python.

  • Objetos Python integrados, incluindo dictos, listas, floats, ints, strings, valores booleanos e tuplas

  • matrizes numéricas

  • Dataframes Pandas

  • Conjuntos de dados e estimadores do Scikit-learn

  • PyTorch modelos

  • TensorFlow modelos

  • A classe Booster para XGBoost

O seguinte pode ser usado com algumas limitações.

  • Dask DataFrames

  • A XGBoost classe Dmatrix

  • TensorFlow conjuntos de dados e subclasses

  • PyTorch modelos

A seção a seguir contém as melhores práticas para usar as classes Python anteriores com algumas limitações em sua função remota, informações sobre onde SageMaker armazena seus dados serializados e como gerenciar o acesso a eles.

Práticas recomendadas para classes de Python com suporte limitado para serialização remota de dados

Você pode usar as classes Python listadas nesta seção com limitações. As próximas seções abordam as práticas recomendadas de como usar as seguintes classes de Python.

  • Dask DataFrames

  • A XGBoost DMatric turma

  • TensorFlow conjuntos de dados e subclasses

  • PyTorch modelos

Dask é uma biblioteca de código aberto usada para computação paralela em Python. Esta seção mostra o seguinte.

  • Como passar um Dask DataFrame para sua função remota

  • Como converter estatísticas resumidas de um Dask DataFrame em um Pandas DataFrame

Como passar um Dask DataFrame para sua função remota

Os Dask DataFrames costumam ser usados para processar grandes conjuntos de dados porque podem conter conjuntos de dados que exigem mais memória do que a disponível. Isso ocorre porque um Dask DataFrame não carrega seus dados locais na memória. Se você passar um Dask DataFrame como argumento de função para sua função remota, o Dask poderá passar uma referência aos dados em seu disco local ou armazenamento em nuvem, em vez dos dados em si. O código a seguir mostra um exemplo de como passar um Dask DataFrame dentro de sua função remota que funcionará em um espaço vazio DataFrame.

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

O Dask carregará os dados do Dask DataFrame na memória somente quando você usar o. DataFrame Se você quiser usar um Dask DataFrame dentro de uma função remota, forneça o caminho para os dados. Em seguida, o Dask lerá o conjunto de dados diretamente do caminho de dados que você especifica quando o código é executado.

O exemplo de código a seguir mostra como usar um Dask DataFrame dentro da função clean remota. No exemplo de código, raw_data_path é passado para clean em vez do Dask DataFrame. Quando o código é executado, o conjunto de dados é lido diretamente na localização de um bucket do Amazon S3 especificado em raw_data_path. Em seguida, a persist função mantém o conjunto de dados na memória para facilitar a random_split função subsequente e gravado de volta no caminho de dados de saída em um bucket do S3 usando as funções do DataFrame API Dask.

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
Como converter estatísticas resumidas de um Dask DataFrame em um Pandas DataFrame

As estatísticas resumidas de um Dask DataFrame podem ser convertidas em Pandas DataFrame invocando o compute método conforme mostrado no código de exemplo a seguir. No exemplo, o bucket do S3 contém um grande Dask DataFrame que não cabe na memória ou em um dataframe Pandas. No exemplo a seguir, uma função remota escaneia o conjunto de dados e retorna um Dask DataFrame contendo as estatísticas de saída describe para um Pandas. DataFrame

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrixé uma estrutura de dados interna usada pelo XGBoost para carregar dados. Não é possível selecionar um DMatrix objeto para se mover facilmente entre as sessões de computação. DMatrixAs instâncias que passam diretamente falharão com umSerializationError.

Como passar um objeto de dados para sua função remota e treinar com XGBoost

Para converter um Pandas DataFrame em uma DMatrix instância e usá-lo para treinar em sua função remota, passe-o diretamente para a função remota, conforme mostrado no exemplo de código a seguir.

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow conjuntos de dados e subclasses são objetos internos usados pelo TensorFlow para carregar dados durante o treinamento. TensorFlow conjuntos de dados e subclasses não podem ser selecionados para se moverem facilmente entre as sessões de computação. A aprovação direta de conjuntos de dados ou subclasses do Tensorflow falhará com um SerializationError. Use o Tensorflow I/O APIs para carregar dados do armazenamento, conforme mostrado no exemplo de código a seguir.

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch os modelos são serializáveis e podem ser passados entre o ambiente local e a função remota. Se o ambiente local e o ambiente remoto tiverem tipos de dispositivos diferentes, como (GPUseCPUs), você não poderá devolver um modelo treinado ao ambiente local. Por exemplo, se o código a seguir for desenvolvido em um ambiente local sem, GPUs mas executado em uma instância comGPUs, retornar o modelo treinado diretamente resultará em DeserializationError a.

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

Para retornar um modelo treinado em um GPU ambiente que contenha somente CPU recursos, use a E/S do PyTorch modelo APIs diretamente, conforme mostrado no exemplo de código abaixo.

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

Onde SageMaker armazena seus dados serializados

Quando você invoca uma função remota, serializa SageMaker automaticamente os argumentos da função e retorna valores durante os estágios de entrada e saída. Esses dados serializados são armazenados em um diretório raiz no bucket do S3. Você especifica o diretório raiz, <s3_root_uri>, em um arquivo de configuração. O parâmetro job_name é gerado automaticamente para você.

No diretório raiz, SageMaker cria uma <job_name> pasta que contém seu diretório de trabalho atual, a função serializada, os argumentos para sua função serializada, os resultados e quaisquer exceções decorrentes da invocação da função serializada.

Em <job_name>, o diretório workdir contém um arquivo compactado do diretório de trabalho atual. O arquivo compactado inclui todos os arquivos Python no diretório de trabalho e o arquivo requirements.txt, que especifica todas as dependências necessárias para executar a função remota.

Veja a seguir um exemplo da estrutura da pasta em um bucket do S3 que você especifica no arquivo de configuração.

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

O diretório raiz que você especifica no bucket do S3 não se destina ao armazenamento de longo prazo. Os dados serializados estão estreitamente vinculados à versão do Python e à versão da estrutura de machine learning (ML) que foram usadas durante a serialização. Se você atualizar a versão Python ou a estrutura de ML, talvez não consiga usar os dados serializados. Em vez disso, faça o seguinte:

  • Armazene o modelo e os artefatos do modelo em um formato independente da versão do Python e da estrutura de ML.

  • Se você atualizar a estrutura Python ou ML, acesse os resultados do modelo no armazenamento de longo prazo.

Importante

Para excluir os dados serializados após um determinado período, defina uma configuração vitalícia no bucket do S3.

nota

Os arquivos serializados com o módulo pickle do Python podem ser menos portáteis do que outros formatos de dados, CSV incluindo Parquet e. JSON Tenha cuidado ao carregar arquivos .pickle de fontes desconhecidas.

Para obter mais informações sobre o que incluir em um arquivo de configuração para uma função remota, consulte Arquivo de configuração.

Acesso aos dados serializados

Os administradores podem fornecer configurações dos dados serializados, incluindo a localização e quaisquer configurações de criptografia em um arquivo de configuração. Por padrão, os dados serializados são criptografados com uma chave AWS Key Management Service (AWS KMS). Os administradores também podem restringir o acesso ao diretório raiz que você especifica no seu arquivo de configuração com uma política do bucket. O arquivo de configuração pode ser compartilhado e usado entre projetos e trabalhos. Para obter mais informações, consulte Arquivo de configuração.

Use o RemoteExecutor API para invocar uma função

Você pode usar o RemoteExecutor API para invocar uma função. SageMaker O Python SDK transformará o código dentro da RemoteExecutor chamada em um trabalho de SageMaker treinamento. O trabalho de treinamento então invocará a função como uma operação assíncrona e retornará um future. Se você usar o RemoteExecutorAPI, poderá executar mais de um trabalho de treinamento em paralelo. Para obter mais informações sobre futuros em Python, consulte Futures.

O exemplo de código a seguir mostra como importar as bibliotecas necessárias, definir uma função, iniciar uma SageMaker instância e usar a API para enviar uma solicitação para executar 2 trabalhos em paralelo.

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

A classe RemoteExecutor é uma implantação da biblioteca concurrent.futures.Executor.

O exemplo de código a seguir mostra como definir uma função e chamá-la usando o RemoteExecutorAPI. Neste exemplo, eles RemoteExecutor enviarão 4 trabalhos no total, mas somente 2 em paralelo. As duas últimas tarefas reutilizarão os clusters com o mínimo de sobrecarga.

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

O parâmetro max_parallel_job serve apenas como um mecanismo de limitação de taxa sem otimizar a alocação de recursos computacionais. No exemplo de código anterior, RemoteExecutor não reserva recursos de computação para os dois trabalhos paralelos antes que qualquer trabalho seja enviado. Para obter mais informações sobre max_parallel_job ou outros parâmetros para o decorador @remote, consulte Especificação de métodos e classes de funções remotas.

Classe futura para o RemoteExecutor API

A classe future é uma classe pública que representa a função de retorno do trabalho de treinamento quando ela é invocada de forma assíncrona. A classe future implementa a classe concurrent.futures.Future. Essa classe pode ser usada para realizar operações no trabalho subjacente e carregar dados na memória.