Invoca una función remota - Amazon SageMaker

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Invoca una función remota

Para invocar una función dentro del decorador @remote, utilice cualquiera de los métodos siguientes:

Si utiliza el método decorador @remote para invocar una función, el trabajo de entrenamiento esperará a que la función se complete antes de iniciar una nueva tarea. Sin embargo, si usa el RemoteExecutorAPI, puede ejecutar más de un trabajo en paralelo. En las siguientes secciones se muestran las dos formas de invocar una función.

Use un decorador @remote para invocar una función

Puedes usar el decorador @remote para anotar una función. SageMaker transformará el código del decorador en un SageMaker trabajo de formación. A continuación, el trabajo de entrenamiento invocará la función dentro del decorador y esperará a que se complete el trabajo. El siguiente ejemplo de código muestra cómo importar las bibliotecas necesarias, iniciar una SageMaker instancia y anotar una multiplicación de matrices con el decorador @remote.

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

El decorador se define de la siguiente manera.

def remote( *, **kwarg): ...

Cuando se invoca una función decorada, SageMaker Python SDK carga cualquier excepción provocada por un error en la memoria local. En el siguiente ejemplo de código, la primera llamada a la función de división se completa correctamente y el resultado se carga en la memoria local. En la segunda llamada a la función de división, el código devuelve un error y este error se carga en la memoria local.

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
nota

La función decorada se ejecuta como un trabajo remoto. Si se interrumpe la trama, el trabajo subyacente no se detendrá.

Cómo cambiar el valor de una variable local

La función decoradora se ejecuta en una máquina remota. Cambiar una variable no local o los argumentos de entrada dentro de una función decorada no cambiará el valor local.

En el siguiente ejemplo de código, se añaden una lista y un diccionario dentro de la función de decorador. Esto no cambia cuando se invoca la función de decorador.

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

Para cambiar el valor de una variable local declarada dentro de una función de decorador, devuelva la variable de la función. El siguiente ejemplo de código muestra que el valor de una variable local cambia cuando la función la devuelve.

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

Serialización y deserialización de datos

Al invocar una función remota, serializa SageMaker automáticamente los argumentos de la función durante las etapas de entrada y salida. Los argumentos y los retornos de la función se serializan con cloudpickle. SageMaker admite la serialización de los siguientes objetos y funciones de Python.

  • Objetos de Python integrados que incluyen dictados, listas, flotantes, enteros, cadenas, valores booleanos y tuplas

  • Matrices Numpy

  • Marcos de datos de Pandas

  • Estimadores y conjuntos de datos de Scikit-learn

  • PyTorch modelos

  • TensorFlow modelos

  • La clase Booster para XGBoost

Se puede utilizar lo siguiente con algunas limitaciones.

  • Dask DataFrames

  • La clase XGBoost Dmatrix

  • TensorFlow conjuntos de datos y subclases

  • PyTorch modelos

La siguiente sección contiene las prácticas recomendadas para usar las clases de Python anteriores con algunas limitaciones en la función remota, información sobre dónde se SageMaker almacenan los datos serializados y cómo administrar el acceso a ellos.

Prácticas recomendadas para las clases de Python con asistencia limitada para la serialización remota de datos

Puede usar las clases de Python enumeradas en esta sección con limitaciones. En las siguientes secciones, se analizan las prácticas recomendadas para utilizar las siguientes clases de Python.

  • Dask DataFrames

  • La XGBoost DMatric clase

  • TensorFlow conjuntos de datos y subclases

  • PyTorch modelos

Dask es una biblioteca de código abierto que se utiliza para la computación paralela en Python. En esta sección se muestra lo siguiente.

  • ¿Cómo pasar un Dask DataFrame a tu función remota

  • ¿Cómo convertir las estadísticas resumidas de un Dask DataFrame en un Pandas DataFrame

¿Cómo pasar un Dask DataFrame a tu función remota

Los Dask DataFrames se utilizan a menudo para procesar conjuntos de datos de gran tamaño porque pueden contener conjuntos de datos que requieren más memoria de la disponible. Esto se debe a que un Dask DataFrame no carga los datos locales en la memoria. Si pasa un argumento de Dask DataFrame como función a su función remota, Dask puede pasar una referencia a los datos de su disco local o almacenamiento en la nube, en lugar de a los datos en sí. En el siguiente código se muestra un ejemplo de cómo introducir un Dask DataFrame dentro de una función remota que funcionará cuando esté vacío. DataFrame

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

Dask cargará los datos del Dask DataFrame en la memoria solo cuando utilices el. DataFrame Si desea utilizar un Dask DataFrame dentro de una función remota, indique la ruta de acceso a los datos. Luego, Dask leerá el conjunto de datos directamente desde la ruta de datos que especifique cuando se ejecute el código.

El siguiente ejemplo de código muestra cómo usar un Dask DataFrame dentro de la función remota. clean En el ejemplo de código, raw_data_path se pasa a clean en lugar de a DataFrame Dask. Cuando se ejecuta el código, el conjunto de datos se lee directamente desde la ubicación de un bucket de Amazon S3 especificado en raw_data_path. Luego, la persist función guarda el conjunto de datos en la memoria para facilitar la random_split función posterior y lo vuelve a escribir en la ruta de datos de salida en un depósito de S3 mediante las funciones de Dask DataFrame API.

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
¿Cómo convertir las estadísticas resumidas de un Dask DataFrame en un Pandas DataFrame

Las estadísticas resumidas de un Dask se DataFrame pueden convertir en un Pandas DataFrame invocando el compute método, como se muestra en el siguiente código de ejemplo. En el ejemplo, el depósito S3 contiene un Dask grande DataFrame que no cabe en la memoria ni en un marco de datos de Pandas. En el siguiente ejemplo, una función remota escanea el conjunto de datos y devuelve un Dask DataFrame que contiene las estadísticas de salida a un Pandas. describe DataFrame

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrixes una estructura de datos interna utilizada XGBoost para cargar datos. No se puede separar un DMatrix objeto para poder moverlo fácilmente entre sesiones de cómputo. DMatrixLas instancias que pasen directamente fallarán con unSerializationError.

Cómo pasar un objeto de datos a tu función remota y entrenar con XGBoost

Para convertir un Pandas DataFrame en una DMatrix instancia y usarlo como entrenamiento en tu función remota, pásalo directamente a la función remota, tal y como se muestra en el siguiente ejemplo de código.

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow los conjuntos de datos y las subclases son objetos internos que se utilizan TensorFlow para cargar datos durante el entrenamiento. TensorFlow los conjuntos de datos y las subclases no se pueden agrupar para poder moverse fácilmente entre sesiones de procesamiento. Al pasar directamente conjuntos de datos o subclases de Tensorflow, se producirá un error con un SerializationError. Usa la E/S de Tensorflow APIs para cargar datos del almacenamiento, como se muestra en el siguiente ejemplo de código.

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch los modelos se pueden serializar y se pueden transferir entre su entorno local y la función remota. Si su entorno local y remoto tienen diferentes tipos de dispositivos, como (GPUsyCPUs), no podrá devolver un modelo entrenado a su entorno local. Por ejemplo, si el siguiente código se desarrolla en un entorno local GPUs pero se ejecuta en una instancia conGPUs, si devuelve directamente el modelo entrenado, se generará unDeserializationError.

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

Para devolver un modelo entrenado en un GPU entorno que solo contenga CPU capacidades, utilice APIs directamente la E/S del PyTorch modelo, tal y como se muestra en el ejemplo de código que aparece a continuación.

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

¿Dónde se SageMaker almacenan los datos serializados

Al invocar una función remota, serializa SageMaker automáticamente los argumentos de la función y los valores devueltos durante las etapas de entrada y salida. Estos datos serializados se almacenan en un directorio raíz en el bucket de S3. El directorio raíz, <s3_root_uri>, se especifica en un archivo de configuración. El parámetro job_name se genera automáticamente para usted.

En el directorio raíz, SageMaker crea una <job_name> carpeta que contiene el directorio de trabajo actual, la función serializada, los argumentos de la función serializada, los resultados y cualquier excepción que haya surgido al invocar la función serializada.

Bajo <job_name>, el directorio workdir contiene un archivo comprimido de su directorio de trabajo actual. El archivo comprimido incluye todos los archivos de Python del directorio de trabajo y el archivo requirements.txt, que especifica las dependencias necesarias para ejecutar la función remota.

A continuación, se muestra un ejemplo de la estructura de carpetas de un bucket de S3 que se especifica en el archivo de configuración.

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

El directorio raíz que especifique en su bucket de S3 no está diseñado para el almacenamiento a largo plazo. Los datos serializados están estrechamente relacionados con la versión de Python y la versión del marco de machine learning (ML) que se utilizaron durante la serialización. Si actualiza la versión de Python o el marco ML, es posible que no pueda usar sus datos serializados. En su lugar, haga lo siguiente.

  • Guarde el modelo y los artefactos del modelo en un formato que sea independiente de su versión de Python y del marco de ML.

  • Si actualiza su marco de Python o ML, acceda a los resultados de su modelo desde su almacenamiento a largo plazo.

importante

Para eliminar los datos serializados después de un período de tiempo específico, establezca una configuración de vida útil en su bucket de S3.

nota

Los archivos que se serializan con el módulo pickle de Python pueden ser menos portátiles que otros formatos de datosCSV, incluidos Parquet y. JSON Tenga cuidado de no cargar archivos separados de fuentes desconocidas.

Para obtener más información sobre qué incluir en un archivo de configuración para una función remota, consulte Archivo de configuración.

Acceder a los datos serializados

Los administradores pueden proporcionar la configuración de los datos serializados, incluida su ubicación y cualquier configuración de cifrado en un archivo de configuración. De forma predeterminada, los datos serializados se cifran con una clave AWS Key Management Service ()AWS KMS. Los administradores también pueden restringir el acceso al directorio raíz que especifique en el archivo de configuración mediante una política de bucket. El archivo de configuración se puede compartir y usar en todos los proyectos y trabajos. Para obtener más información sobre los archivos de configuración, consulte Archivo de configuración.

Utilice la RemoteExecutor API para invocar una función

Puede usar el RemoteExecutor API para invocar una función. SageMaker Python SDK transformará el código de la RemoteExecutor llamada en un trabajo de SageMaker formación. A continuación, el trabajo de entrenamiento invocará la función como una operación asíncrona y devolverá un future. Si usa el RemoteExecutorAPI, puede ejecutar más de un trabajo de entrenamiento en paralelo. Para obtener más información sobre los futuros en Python, consulte Futures.

El siguiente ejemplo de código muestra cómo importar las bibliotecas necesarias, definir una función, iniciar una SageMaker instancia y utilizarlas API para enviar una solicitud para ejecutar 2 trabajos en paralelo.

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

La clase RemoteExecutor es una implementación de la biblioteca concurrent.futures.Executor.

En el ejemplo de código siguiente se muestra cómo definir una función y cómo llamarla utilizando la RemoteExecutorAPI. En este ejemplo, el RemoteExecutor enviará 4 trabajos en total, pero solo 2 en paralelo. Los dos últimos trabajos reutilizarán los clústeres con una sobrecarga mínima.

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

El parámetro max_parallel_job solo sirve como mecanismo de limitación de velocidad sin optimizar la asignación de los recursos de computación. En el ejemplo de código anterior, RemoteExecutor no reserva recursos de computación para los dos trabajos paralelos antes de que se envíe ningún trabajo. Para obtener más información sobre max_parallel_job u otros parámetros del decorador @remote, consulte Especificación de métodos y clases de funciones remotas.

Clase futura para RemoteExecutor API

Una clase future es una clase pública que representa la función de retorno del trabajo de entrenamiento cuando se invoca de forma asíncrona. La clase future implementa la clase Concurrent.futures.Future. Esta clase se puede usar para realizar operaciones en el trabajo subyacente y cargar datos en la memoria.