Richiama una funzione remota - Amazon SageMaker

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Richiama una funzione remota

Per richiamare una funzione all'interno del decoratore @remote, utilizza uno dei metodi seguenti:

Se utilizzi il metodo del decoratore @remote per richiamare una funzione, il processo di addestramento attenderà il completamento della funzione prima di iniziare una nuova attività. Tuttavia, se si utilizza il RemoteExecutorAPI, è possibile eseguire più di un processo in parallelo. Le sezioni seguenti mostrano entrambi i modi di richiamare una funzione.

Utilizzo di un decoratore @remote per richiamare una funzione

È possibile utilizzare il decoratore @remote per annotare una funzione. SageMaker trasformerà il codice all'interno del decoratore in un SageMaker lavoro di formazione. Il processo di addestramento richiamerà quindi la funzione all'interno del decoratore e attenderà il completamento del processo. Il seguente esempio di codice mostra come importare le librerie richieste, avviare un' SageMakeristanza e annotare una moltiplicazione di matrici con il decoratore @remote.

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

Il decoratore è definito nel modo seguente.

def remote( *, **kwarg): ...

Quando si richiama una funzione decorata, SageMaker SDK Python carica tutte le eccezioni sollevate da un errore nella memoria locale. Nel seguente esempio di codice, la prima chiamata alla funzione “divide” viene completata correttamente e il risultato viene caricato nella memoria locale. Nella seconda chiamata alla funzione “divide”, il codice restituisce un errore che viene caricato nella memoria locale.

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
Nota

La funzione decorata viene eseguita come processo remoto. Se il thread viene interrotto, il processo sottostante non verrà interrotto.

Come modificare il valore di una variabile locale

La funzione decoratore viene eseguita su una macchina remota. La modifica di una variabile non locale o degli argomenti di input all'interno di una funzione decorata non modificherà il valore locale.

Nell'esempio di codice seguente, una lista e un “dict” vengono aggiunti all'interno della funzione decoratore. Questo non cambia quando viene invocata la funzione decoratore.

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

Per modificare il valore di una variabile locale dichiarata all'interno di una funzione decoratore, recupera la variabile dalla funzione. Il seguente esempio di codice mostra che il valore di una variabile locale viene modificato quando viene restituito dalla funzione.

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

Serializzazione e deserializzazione dei dati

Quando si richiama una funzione remota, serializza SageMaker automaticamente gli argomenti della funzione durante le fasi di input e output. Gli argomenti e i ritorni delle funzioni vengono serializzati utilizzando cloudpickle. SageMaker supporta la serializzazione dei seguenti oggetti e funzioni Python.

  • Oggetti Python integrati, tra cui dicts, lists, floats, ints, strings, valori booleani e tuples

  • Matrici Numpy

  • Pandas DataFrame

  • Set di dati e strumenti di valutazione Scikit-Learn

  • PyTorch modelli

  • TensorFlow modelli

  • La classe Booster per XGBoost

Quanto segue può essere usato con alcune limitazioni.

  • Dask DataFrames

  • La classe XGBoost Dmatrix

  • TensorFlow set di dati e sottoclassi

  • PyTorch modelli

La sezione seguente contiene le migliori pratiche per l'utilizzo delle precedenti classi Python con alcune limitazioni nella funzione remota, informazioni su dove vengono SageMaker archiviati i dati serializzati e su come gestirne l'accesso.

Le migliori pratiche per le classi Python con supporto limitato per la serializzazione remota dei dati

È possibile utilizzare le classi Python elencate in questa sezione, con limitazioni. Le sezioni successive illustrano le migliori pratiche per l'utilizzo delle seguenti classi Python.

  • Task DataFrames

  • La XGBoost DMatric classe

  • TensorFlow set di dati e sottoclassi

  • PyTorch modelli

Dask è una libreria open source utilizzata per il calcolo parallelo in Python. In questa sezione viene illustrato quanto segue.

  • Come passare un Dask DataFrame alla tua funzione remota

  • Come convertire le statistiche di riepilogo da un Dask a un DataFrame Pandas DataFrame

Come passare un Dask DataFrame alla tua funzione remota

I Dask DataFrames vengono spesso utilizzati per elaborare set di dati di grandi dimensioni perché possono contenere set di dati che richiedono più memoria di quella disponibile. Questo perché un Dask DataFrame non carica i dati locali in memoria. Se si passa un Dask DataFrame come argomento di funzione alla funzione remota, Dask può passare un riferimento ai dati nel disco locale o nell'archivio cloud, anziché ai dati stessi. Il codice seguente mostra un esempio di passaggio di un Dask DataFrame all'interno della funzione remota che funzionerà a vuoto. DataFrame

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

Dask caricherà i dati dal Dask DataFrame in memoria solo quando si utilizza il. DataFrame Se desideri utilizzare un Dask DataFrame all'interno di una funzione remota, fornisci il percorso dei dati. Dopodiché, Dask leggerà il set di dati direttamente dal percorso dei dati specificato durante l'esecuzione del codice.

Il seguente esempio di codice mostra come utilizzare un Dask DataFrame all'interno della funzione remota. clean Nell'esempio di codice, raw_data_path viene passato a clean anziché a DataFrame Dask. Quando il codice viene eseguito, il set di dati viene letto direttamente dalla posizione di un bucket Amazon S3 specificato in raw_data_path. Quindi la persist funzione mantiene il set di dati in memoria per facilitare la random_split funzione successiva e riscrive nel percorso dei dati di output in un bucket S3 utilizzando le funzioni Dask. DataFrame API

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
Come convertire le statistiche di riepilogo da un Dask in un Pandas DataFrame DataFrame

Le statistiche di riepilogo di un Dask DataFrame possono essere convertite in un Pandas DataFrame richiamando il compute metodo come mostrato nel seguente codice di esempio. Nell'esempio, il bucket S3 contiene un Dask di grandi dimensioni DataFrame che non può entrare nella memoria o in un dataframe Pandas. Nell'esempio seguente, una funzione remota esegue la scansione del set di dati e restituisce un Dask contenente le statistiche di output da un Pandas DataFrame. describe DataFrame

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrixè una struttura dati interna utilizzata da XGBoost per caricare i dati. Un DMatrix oggetto non può essere messo in salamoia per spostarsi facilmente tra le sessioni di calcolo. Il passaggio diretto DMatrix delle istanze avrà esito negativo con un. SerializationError

Come passare un oggetto di dati alla funzione remota e allenarsi con XGBoost

Per convertire un Pandas DataFrame in un'DMatrixistanza e utilizzarlo per allenarsi nella funzione remota, passalo direttamente alla funzione remota come mostrato nel seguente esempio di codice.

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow i set di dati e le sottoclassi sono oggetti interni utilizzati per caricare i dati durante TensorFlow l'addestramento. TensorFlow i set di dati e le sottoclassi non possono essere raggruppati per spostarsi facilmente tra le sessioni di elaborazione. La trasmissione diretta di set di dati o sottoclassi Tensorflow fallirà con un SerializationError. Usa l'I/O Tensorflow APIs per caricare i dati dallo storage, come mostrato nel seguente esempio di codice.

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch i modelli sono serializzabili e possono essere passati tra l'ambiente locale e la funzione remota. Se l'ambiente locale e l'ambiente remoto hanno tipi di dispositivi diversi, ad esempio (GPUseCPUs), non è possibile restituire un modello addestrato all'ambiente locale. Ad esempio, se il codice seguente viene sviluppato in un ambiente locale senza GPUs ma eseguito in un'istanza conGPUs, la restituzione diretta del modello addestrato porterà a unDeserializationError.

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

Per restituire un modello addestrato in un GPU ambiente che contiene solo CPU funzionalità, utilizzate APIs direttamente l'I/O del PyTorch modello, come mostrato nell'esempio di codice riportato di seguito.

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

Dove vengono SageMaker archiviati i dati serializzati

Quando si richiama una funzione remota, serializza SageMaker automaticamente gli argomenti della funzione e restituisce i valori durante le fasi di input e output. Questi dati serializzati vengono archiviati in una directory principale nel bucket S3. La directory principale, <s3_root_uri>, viene specificata in un file di configurazione. Il parametro job_name viene generato automaticamente per l'utente.

Nella directory principale, SageMaker crea una <job_name> cartella che contiene la directory di lavoro corrente, la funzione serializzata, gli argomenti della funzione serializzata, i risultati e tutte le eccezioni derivanti dall'invocazione della funzione serializzata.

Sotto <job_name>, la directory workdir contiene un archivio compresso della directory di lavoro corrente. L'archivio compresso include tutti i file Python nella directory di lavoro e il file requirements.txt, che specifica tutte le dipendenze necessarie per eseguire la funzione remota.

Di seguito è riportato un esempio della struttura delle cartelle in un bucket S3 specificato nel file di configurazione.

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

La directory principale specificata nel bucket S3 non è pensata per l'archiviazione a lungo termine. I dati serializzati sono strettamente legati alla versione Python e alla versione del framework di machine learning (ML) utilizzate durante la serializzazione. Se aggiorni la versione Python o il framework ML, potrebbe non essere possibile utilizzare i dati serializzati. Effettua invece le seguenti operazioni.

  • Archivia il modello e gli artefatti del modello in un formato indipendente dalla versione di Python e dal framework ML.

  • Se aggiorni il framework Python o ML, accedi ai risultati del modello dall'archiviazione a lungo termine.

Importante

Per eliminare i dati serializzati dopo un determinato periodo di tempo, imposta una configurazione a vita sul bucket S3.

Nota

I file serializzati con il modulo pickle Python possono essere meno portabili rispetto ad altri formati di dati, CSV tra cui Parquet e. JSON Fai attenzione a non caricare file raggruppati da fonti sconosciute.

Per ulteriori informazioni su cosa includere in un file di configurazione per una funzione remota, consulta File di configurazione.

Accesso ai dati serializzati

Gli amministratori possono fornire le impostazioni per i dati serializzati, inclusa la posizione e le eventuali impostazioni di crittografia in un file di configurazione. Per impostazione predefinita, i dati serializzati vengono crittografati con una AWS Key Management Service chiave ().AWS KMS Gli amministratori possono anche limitare l'accesso alla directory principale specificata nel file di configurazione con una policy bucket. Il file di configurazione può essere condiviso e utilizzato tra progetti e processi. Per ulteriori informazioni, consulta File di configurazione.

Utilizzate il RemoteExecutor API per richiamare una funzione

È possibile utilizzare il RemoteExecutor API per richiamare una funzione. SageMaker Python SDK trasformerà il codice contenuto nella RemoteExecutor chiamata in un processo di SageMaker formazione. Il processo di addestramento richiamerà quindi la funzione come operazione asincrona e restituirà un futuro. Se si utilizza il RemoteExecutorAPI, è possibile eseguire più di un processo di formazione in parallelo. Per ulteriori informazioni sugli oggetti futuri in Python, consulta Futures.

Il seguente esempio di codice mostra come importare le librerie richieste, definire una funzione, avviare un' SageMaker istanza e utilizzarle API per inviare una richiesta per eseguire 2 lavori in parallelo.

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

La classe RemoteExecutor è un'implementazione della libreria concurrent.futures.Executor.

Il seguente esempio di codice mostra come definire una funzione e richiamarla utilizzando RemoteExecutorAPI. In questo esempio, RemoteExecutor invierà 4 processi in totale, ma solo 2 in parallelo. Gli ultimi due processi riutilizzeranno i cluster con un sovraccarico minimo.

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

Il parametro max_parallel_job funge solo da meccanismo di limitazione della velocità senza ottimizzare l'allocazione delle risorse di calcolo. Nell'esempio di codice precedente, RemoteExecutor non riserva risorse di calcolo per i due processi paralleli prima dell'invio di qualsiasi processo. Per ulteriori informazioni su max_parallel_job o altri parametri per il decoratore @remote, consulta Remote function classes and methods specification.

Classe futura per RemoteExecutor API

Una classe futura è una classe pubblica che rappresenta la funzione di ritorno dal processo di addestramento quando viene richiamata in modo asincrono. La classe futura implementa la classe concurrent.futures.Future. Questa classe può essere utilizzata per eseguire operazioni sul processo sottostante e caricare dati in memoria.