Invoquer une fonction distante - Amazon SageMaker

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Invoquer une fonction distante

Pour invoquer une fonction dans le décorateur @remote, utilisez l'une des méthodes suivantes :

Si vous utilisez la méthode @remote decorator pour invoquer une fonction, la tâche d'entraînement attendra que la fonction soit terminée avant de démarrer une nouvelle tâche. Toutefois, si vous utilisez le RemoteExecutorAPI, vous pouvez exécuter plusieurs tâches en parallèle. Les sections suivantes montrent les deux manières d'invoquer une fonction.

Utilisation d'un décorateur @remote pour invoquer une fonction

Vous pouvez utiliser le décorateur @remote pour annoter une fonction. SageMaker transformera le code contenu dans le décorateur en tâche de SageMaker formation. La tâche d'entraînement invoquera ensuite la fonction dans le décorateur et attendra la fin de la tâche. L'exemple de code suivant montre comment importer les bibliothèques requises, démarrer une SageMaker instance et annoter une multiplication matricielle avec le décorateur @remote.

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

Le décorateur est défini comme suit.

def remote( *, **kwarg): ...

Lorsque vous invoquez une fonction décorée, SageMaker Python SDK charge toutes les exceptions provoquées par une erreur dans la mémoire locale. Dans l'exemple de code suivant, le premier appel à la fonction de division se termine correctement et le résultat est chargé dans la mémoire locale. Lors du deuxième appel à la fonction de division, le code renvoie une erreur et cette erreur est chargée dans la mémoire locale.

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
Note

La fonction décorée est exécutée en tant que tâche distante. Si le thread est interrompu, la tâche sous-jacente ne sera pas arrêtée.

Comment modifier la valeur d'une variable locale

La fonction de décorateur est exécutée sur une machine distante. La modification d'une variable non locale ou d'arguments d'entrée dans une fonction décorée ne modifiera pas la valeur locale.

Dans l'exemple de code suivant, une liste et un dictionnaire sont ajoutés dans la fonction de décoration. Cela ne change pas lorsque la fonction de décorateur est invoquée.

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

Pour modifier la valeur d'une variable locale déclarée dans une fonction de décoration, renvoyez la variable depuis la fonction. L'exemple de code suivant montre que la valeur d'une variable locale est modifiée lorsqu'elle est renvoyée par la fonction.

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

Sérialisation et désérialisation des données

Lorsque vous invoquez une fonction distante, elle sérialise SageMaker automatiquement les arguments de votre fonction pendant les étapes d'entrée et de sortie. Les arguments et les retours des fonctions sont sérialisés à l'aide de cloudpickle. SageMaker prend en charge la sérialisation des objets et fonctions Python suivants.

  • Les objets Python intégrés, notamment des dictionnaires, des listes, des valeurs flottantes, des entiers, des chaînes, des valeurs booléennes et des tuples

  • Tableaux numpy

  • Dataframes Pandas

  • Jeux de données et estimateurs Scikit-learn

  • PyTorch modèles

  • TensorFlow modèles

  • La classe Booster pour XGBoost

Les éléments suivants peuvent être utilisés avec certaines restrictions.

  • Dask DataFrames

  • La XGBoost classe Dmatrix

  • TensorFlow ensembles de données et sous-classes

  • PyTorch modèles

La section suivante contient les meilleures pratiques relatives à l'utilisation des classes Python précédentes, avec certaines limitations dans votre fonction de télécommande, des informations sur l'emplacement de stockage SageMaker de vos données sérialisées et sur la manière de gérer l'accès à celles-ci.

Bonnes pratiques pour les classes Python avec une prise en charge limitée de la sérialisation de données distantes

Vous pouvez utiliser les classes Python répertoriées dans cette section avec certaines restrictions. Les sections suivantes présentent les bonnes pratiques relatives à l'utilisation des classes Python suivantes.

  • Dask DataFrames

  • La XGBoost DMatric classe

  • TensorFlow ensembles de données et sous-classes

  • PyTorch modèles

Dask est une bibliothèque open source utilisée pour le calcul parallèle dans Python. Cette section montre ce qui suit.

  • Comment transférer un Dask DataFrame à votre télécommande

  • Comment convertir les statistiques récapitulatives d'un Dask DataFrame en Pandas DataFrame

Comment transférer un Dask DataFrame à votre télécommande

Les Dask DataFrames sont souvent utilisés pour traiter de grands ensembles de données car ils peuvent contenir des ensembles de données nécessitant plus de mémoire que celle disponible. Cela est dû au fait qu'un Dask DataFrame ne charge pas vos données locales en mémoire. Si vous transmettez un Dask DataFrame en tant qu'argument de fonction à votre fonction distante, Dask peut transmettre une référence aux données de votre disque local ou de votre stockage dans le cloud, au lieu des données elles-mêmes. Le code suivant montre un exemple de passage d'un Dask DataFrame dans votre fonction de télécommande qui fonctionnera à vide. DataFrame

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

Dask chargera les données du Dask DataFrame en mémoire uniquement lorsque vous utiliserez le. DataFrame Si vous souhaitez utiliser un Dask DataFrame dans une fonction distante, indiquez le chemin d'accès aux données. Ensuite, Dask lira le jeu de données directement à partir du chemin de données que vous spécifiez lors de l'exécution du code.

L'exemple de code suivant montre comment utiliser un Dask DataFrame dans la fonction clean de télécommande. Dans l'exemple de code, raw_data_path est passé à clean au lieu du Dask DataFrame. Lorsque le code s'exécute, le jeu de données est lu directement depuis l'emplacement d'un compartiment Amazon S3 spécifié dans raw_data_path. La persist fonction conserve ensuite l'ensemble de données en mémoire pour faciliter la random_split fonction suivante et le réécrit dans le chemin des données de sortie dans un compartiment S3 à l'aide des DataFrame API fonctions Dask.

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
Comment convertir les statistiques récapitulatives d'un Dask DataFrame en Pandas DataFrame

Les statistiques récapitulatives d'un Dask DataFrame peuvent être converties en Pandas DataFrame en invoquant la compute méthode, comme indiqué dans l'exemple de code suivant. Dans l'exemple, le compartiment S3 contient un grand disque dur DataFrame qui ne peut pas tenir dans la mémoire ou dans une trame de données Pandas. Dans l'exemple suivant, une fonction distante analyse l'ensemble de données et renvoie un Dask DataFrame contenant les statistiques de sortie describe d'un Pandas DataFrame.

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrixest une structure de données interne utilisée par XGBoost pour charger des données. Un DMatrix objet ne peut pas être sélectionné afin de pouvoir passer facilement d'une session de calcul à une autre. Le passage direct DMatrix des instances échouera avec unSerializationError.

Comment transmettre un objet de données à votre télécommande et vous entraîner avec XGBoost

Pour convertir un Pandas DataFrame en DMatrix instance et l'utiliser pour l'entraînement à votre fonction à distance, transmettez-le directement à la fonction distante, comme indiqué dans l'exemple de code suivant.

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow les ensembles de données et les sous-classes sont des objets internes utilisés TensorFlow pour charger des données pendant l'entraînement. TensorFlow les ensembles de données et les sous-classes ne peuvent pas être sélectionnés afin de passer facilement d'une session de calcul à une autre. La transmission directe de jeux de données ou de sous-classes Tensorflow échouera avec une SerializationError. Utilisez les E/S Tensorflow APIs pour charger les données depuis le stockage, comme indiqué dans l'exemple de code suivant.

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch les modèles sont sérialisables et peuvent être transmis entre votre environnement local et la fonction distante. Si votre environnement local et votre environnement distant ont des types d'appareils différents, tels que (GPUsetCPUs), vous ne pouvez pas renvoyer un modèle entraîné dans votre environnement local. Par exemple, si le code suivant est développé dans un environnement local GPUs sans être exécuté dans une instance avecGPUs, le renvoi direct du modèle entraîné entraînera unDeserializationError.

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

Pour transformer un modèle entraîné dans un GPU environnement en un modèle contenant uniquement des CPU fonctionnalités, utilisez APIs directement les E/S du PyTorch modèle, comme indiqué dans l'exemple de code ci-dessous.

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

Où sont SageMaker stockées vos données sérialisées

Lorsque vous invoquez une fonction distante, elle sérialise SageMaker automatiquement les arguments de votre fonction et les valeurs renvoyées pendant les étapes d'entrée et de sortie. Ces données sérialisées sont stockées dans un répertoire racine de votre compartiment S3. Vous spécifiez le répertoire racine, <s3_root_uri>, dans un fichier de configuration. Le paramètre job_name est automatiquement généré pour vous.

Sous le répertoire racine, SageMaker crée un <job_name> dossier contenant votre répertoire de travail actuel, votre fonction sérialisée, les arguments de votre fonction sérialisée, les résultats et toutes les exceptions résultant de l'invocation de la fonction sérialisée.

Sous <job_name>, le répertoire workdir contient une archive compressée de votre répertoire de travail actuel. L'archive compressée inclut tous les fichiers Python de votre répertoire de travail ainsi que le fichier requirements.txt, qui spécifie les dépendances nécessaires pour exécuter votre fonction distante.

Voici un exemple de structure de dossiers sous un compartiment S3 que vous spécifiez dans votre fichier de configuration.

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

Le répertoire racine que vous spécifiez dans votre compartiment S3 n'est pas destiné au stockage à long terme. Les données sérialisées sont étroitement liées à la version Python et à la version du framework de machine learning (ML) utilisées lors de la sérialisation. Si vous mettez à niveau la version Python ou le framework de machine learning, vous ne pourrez peut-être pas utiliser vos données sérialisées. Procédez plutôt comme suit.

  • Stockez votre modèle et les artefacts de votre modèle dans un format indépendant de votre version Python et de votre framework de machine learning.

  • Si vous mettez à niveau votre Python ou framework de machine learning, accédez aux résultats de votre modèle depuis votre stockage à long terme.

Important

Pour supprimer vos données sérialisées après un certain temps, définissez une configuration à durée de vie sur votre compartiment S3.

Note

Les fichiers sérialisés avec le module Python pickle peuvent être moins portables que d'autres formats de donnéesCSV, notamment Parquet et. JSON Méfiez-vous du chargement de fichiers pickle provenant de sources inconnues.

Pour plus d'informations sur les éléments à inclure dans un fichier de configuration pour une fonction distante, consultez Fichier de configuration.

Accès à vos données sérialisées

Les administrateurs peuvent définir les paramètres de vos données sérialisées, notamment leur emplacement et tout paramètre de chiffrement dans un fichier de configuration. Par défaut, les données sérialisées sont chiffrées avec une clé AWS Key Management Service (AWS KMS). Les administrateurs peuvent également restreindre l'accès au répertoire racine que vous spécifiez dans votre fichier de configuration à l'aide d'une politique de compartiment. Le fichier de configuration peut être partagé et utilisé entre les projets et les tâches. Pour plus d'informations, consultez Fichier de configuration.

Utilisez le RemoteExecutor API pour appeler une fonction

Vous pouvez utiliser le RemoteExecutor API pour appeler une fonction. SageMaker Python SDK transformera le code contenu dans l'RemoteExecutorappel en tâche SageMaker de formation. La tâche d'entraînement invoquera ensuite la fonction en tant qu'opération asynchrone et renverra un objet Future. Si vous utilisez le RemoteExecutorAPI, vous pouvez exécuter plusieurs tâches de formation en parallèle. Pour plus d'informations sur les objets Future dans Python, consultez Futures.

L'exemple de code suivant montre comment importer les bibliothèques requises, définir une fonction, démarrer une SageMaker instance et utiliser le API pour soumettre une demande d'exécution de 2 tâches en parallèle.

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

La classe RemoteExecutor est une implémentation de la bibliothèque concurrent.futures.Executor.

L'exemple de code suivant montre comment définir et appeler une fonction avec RemoteExecutorAPI. Dans cet exemple, RemoteExecutor soumettra 4 tâches au total, mais uniquement 2 en parallèle. Les deux dernières tâches réutiliseront les clusters avec une surcharge minimale.

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

Le paramètre max_parallel_job sert uniquement de mécanisme de limitation du débit sans optimiser l'allocation des ressources de calcul. Dans l'exemple de code précédent, RemoteExecutor ne réserve pas de ressources de calcul pour les deux tâches parallèles avant que les tâches ne soient soumises. Pour plus d'informations sur max_parallel_job ou sur d'autres paramètres du décorateur @remote, consultez Spécification des classes et méthodes de fonctions distantes (langue française non garantie).

Cours du futur pour RemoteExecutor API

Une classe Future est une classe publique qui représente la fonction de retour de la tâche d'entraînement lorsqu'elle est invoquée de manière asynchrone. La classe Future implémente la classe concurrent.futures.Future. Cette classe peut être utilisée pour effectuer des opérations sur la tâche sous-jacente et charger des données en mémoire.