Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Richiama una funzione remota
Per richiamare una funzione all'interno del decoratore @remote, utilizza uno dei metodi seguenti:
Se utilizzi il metodo del decoratore @remote per richiamare una funzione, il processo di addestramento attenderà il completamento della funzione prima di iniziare una nuova attività. Tuttavia, se si utilizza il RemoteExecutor
API, è possibile eseguire più di un processo in parallelo. Le sezioni seguenti mostrano entrambi i modi di richiamare una funzione.
Utilizzo di un decoratore @remote per richiamare una funzione
È possibile utilizzare il decoratore @remote per annotare una funzione. SageMaker trasformerà il codice all'interno del decoratore in un SageMaker lavoro di formazione. Il processo di addestramento richiamerà quindi la funzione all'interno del decoratore e attenderà il completamento del processo. Il seguente esempio di codice mostra come importare le librerie richieste, avviare un' SageMakeristanza e annotare una moltiplicazione di matrici con il decoratore @remote.
from sagemaker.remote_function import remote import numpy as np @remote(instance_type="
ml.m5.large
") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()
Il decoratore è definito nel modo seguente.
def remote( *, **kwarg): ...
Quando si richiama una funzione decorata, SageMaker SDK Python carica tutte le eccezioni sollevate da un errore nella memoria locale. Nel seguente esempio di codice, la prima chiamata alla funzione “divide” viene completata correttamente e il risultato viene caricato nella memoria locale. Nella seconda chiamata alla funzione “divide”, il codice restituisce un errore che viene caricato nella memoria locale.
from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
Nota
La funzione decorata viene eseguita come processo remoto. Se il thread viene interrotto, il processo sottostante non verrà interrotto.
Come modificare il valore di una variabile locale
La funzione decoratore viene eseguita su una macchina remota. La modifica di una variabile non locale o degli argomenti di input all'interno di una funzione decorata non modificherà il valore locale.
Nell'esempio di codice seguente, una lista e un “dict” vengono aggiunti all'interno della funzione decoratore. Questo non cambia quando viene invocata la funzione decoratore.
a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}
Per modificare il valore di una variabile locale dichiarata all'interno di una funzione decoratore, recupera la variabile dalla funzione. Il seguente esempio di codice mostra che il valore di una variabile locale viene modificato quando viene restituito dalla funzione.
a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}
Serializzazione e deserializzazione dei dati
Quando si richiama una funzione remota, serializza SageMaker automaticamente gli argomenti della funzione durante le fasi di input e output. Gli argomenti e i ritorni delle funzioni vengono serializzati utilizzando cloudpickle.
-
Oggetti Python integrati, tra cui dicts, lists, floats, ints, strings, valori booleani e tuples
-
Matrici Numpy
-
Pandas DataFrame
-
Set di dati e strumenti di valutazione Scikit-Learn
-
PyTorch modelli
-
TensorFlow modelli
-
La classe Booster per XGBoost
Quanto segue può essere usato con alcune limitazioni.
-
Dask DataFrames
-
La classe XGBoost Dmatrix
-
TensorFlow set di dati e sottoclassi
-
PyTorch modelli
La sezione seguente contiene le migliori pratiche per l'utilizzo delle precedenti classi Python con alcune limitazioni nella funzione remota, informazioni su dove vengono SageMaker archiviati i dati serializzati e su come gestirne l'accesso.
Le migliori pratiche per le classi Python con supporto limitato per la serializzazione remota dei dati
È possibile utilizzare le classi Python elencate in questa sezione, con limitazioni. Le sezioni successive illustrano le migliori pratiche per l'utilizzo delle seguenti classi Python.
-
Task
DataFrames -
La XGBoost DMatric classe
-
TensorFlow set di dati e sottoclassi
-
PyTorch modelli
Dask
-
Come passare un Dask DataFrame alla tua funzione remota
-
Come convertire le statistiche di riepilogo da un Dask a un DataFrame Pandas DataFrame
Come passare un Dask DataFrame alla tua funzione remota
I Dask DataFrames
#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...
Dask caricherà i dati dal Dask DataFrame in memoria solo quando si utilizza il. DataFrame Se desideri utilizzare un Dask DataFrame all'interno di una funzione remota, fornisci il percorso dei dati. Dopodiché, Dask leggerà il set di dati direttamente dal percorso dei dati specificato durante l'esecuzione del codice.
Il seguente esempio di codice mostra come utilizzare un Dask DataFrame all'interno della funzione remota. clean
Nell'esempio di codice, raw_data_path
viene passato a clean anziché a DataFrame Dask. Quando il codice viene eseguito, il set di dati viene letto direttamente dalla posizione di un bucket Amazon S3 specificato in raw_data_path
. Quindi la persist
funzione mantiene il set di dati in memoria per facilitare la random_split
funzione successiva e riscrive nel percorso dei dati di output in un bucket S3 utilizzando le funzioni Dask. DataFrame API
import dask.dataframe as dd @remote( instance_type='
ml.m5.24xlarge
', volume_size=300
, keep_alive_period_in_seconds=600
) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/
", "s3://amzn-s3-demo-bucket/cleaned/
", split_ratio=[0.7, 0.3]
)
Come convertire le statistiche di riepilogo da un Dask in un Pandas DataFrame DataFrame
Le statistiche di riepilogo di un Dask DataFrame possono essere convertite in un Pandas DataFrame richiamando il compute
metodo come mostrato nel seguente codice di esempio. Nell'esempio, il bucket S3 contiene un Dask di grandi dimensioni DataFrame che non può entrare nella memoria o in un dataframe Pandas. Nell'esempio seguente, una funzione remota esegue la scansione del set di dati e restituisce un Dask contenente le statistiche di output da un Pandas DataFrame. describe
DataFrame
executor = RemoteExecutor( instance_type='
ml.m5.24xlarge
', volume_size=300
, keep_alive_period_in_seconds=600
) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/
").describe().compute()) future.result()
DMatrixè una struttura dati interna utilizzata da XGBoost per caricare i dati. Un DMatrix oggetto non può essere messo in salamoia per spostarsi facilmente tra le sessioni di calcolo. Il passaggio diretto DMatrix delle istanze avrà esito negativo con un. SerializationError
Come passare un oggetto di dati alla funzione remota e allenarsi con XGBoost
Per convertire un Pandas DataFrame in un'DMatrixistanza e utilizzarlo per allenarsi nella funzione remota, passalo direttamente alla funzione remota come mostrato nel seguente esempio di codice.
import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)
TensorFlow i set di dati e le sottoclassi sono oggetti interni utilizzati per caricare i dati durante TensorFlow l'addestramento. TensorFlow i set di dati e le sottoclassi non possono essere raggruppati per spostarsi facilmente tra le sessioni di elaborazione. La trasmissione diretta di set di dati o sottoclassi Tensorflow fallirà con un SerializationError
. Usa l'I/O Tensorflow APIs per caricare i dati dallo storage, come mostrato nel seguente esempio di codice.
import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("
s3://amzn-s3-demo-bucket/data
", {})
PyTorch i modelli sono serializzabili e possono essere passati tra l'ambiente locale e la funzione remota. Se l'ambiente locale e l'ambiente remoto hanno tipi di dispositivi diversi, ad esempio (GPUseCPUs), non è possibile restituire un modello addestrato all'ambiente locale. Ad esempio, se il codice seguente viene sviluppato in un ambiente locale senza GPUs ma eseguito in un'istanza conGPUs, la restituzione diretta del modello addestrato porterà a unDeserializationError
.
# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='
ml.g4dn.xlarge
') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU
Per restituire un modello addestrato in un GPU ambiente che contiene solo CPU funzionalità, utilizzate APIs direttamente l'I/O del PyTorch modello, come mostrato nell'esempio di codice riportato di seguito.
import s3fs model_path = "
s3://amzn-s3-demo-bucket/folder/
" @remote(instance_type='ml.g4dn.xlarge
') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))
Dove vengono SageMaker archiviati i dati serializzati
Quando si richiama una funzione remota, serializza SageMaker automaticamente gli argomenti della funzione e restituisce i valori durante le fasi di input e output. Questi dati serializzati vengono archiviati in una directory principale nel bucket S3. La directory principale, <s3_root_uri>
, viene specificata in un file di configurazione. Il parametro job_name
viene generato automaticamente per l'utente.
Nella directory principale, SageMaker crea una <job_name>
cartella che contiene la directory di lavoro corrente, la funzione serializzata, gli argomenti della funzione serializzata, i risultati e tutte le eccezioni derivanti dall'invocazione della funzione serializzata.
Sotto <job_name>
, la directory workdir
contiene un archivio compresso della directory di lavoro corrente. L'archivio compresso include tutti i file Python nella directory di lavoro e il file requirements.txt
, che specifica tutte le dipendenze necessarie per eseguire la funzione remota.
Di seguito è riportato un esempio della struttura delle cartelle in un bucket S3 specificato nel file di configurazione.
<s3_root_uri>
/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function
La directory principale specificata nel bucket S3 non è pensata per l'archiviazione a lungo termine. I dati serializzati sono strettamente legati alla versione Python e alla versione del framework di machine learning (ML) utilizzate durante la serializzazione. Se aggiorni la versione Python o il framework ML, potrebbe non essere possibile utilizzare i dati serializzati. Effettua invece le seguenti operazioni.
-
Archivia il modello e gli artefatti del modello in un formato indipendente dalla versione di Python e dal framework ML.
-
Se aggiorni il framework Python o ML, accedi ai risultati del modello dall'archiviazione a lungo termine.
Importante
Per eliminare i dati serializzati dopo un determinato periodo di tempo, imposta una configurazione a vita sul bucket S3.
Nota
I file serializzati con il modulo pickle Python
Per ulteriori informazioni su cosa includere in un file di configurazione per una funzione remota, consulta File di configurazione.
Accesso ai dati serializzati
Gli amministratori possono fornire le impostazioni per i dati serializzati, inclusa la posizione e le eventuali impostazioni di crittografia in un file di configurazione. Per impostazione predefinita, i dati serializzati vengono crittografati con una AWS Key Management Service chiave ().AWS KMS Gli amministratori possono anche limitare l'accesso alla directory principale specificata nel file di configurazione con una policy bucket. Il file di configurazione può essere condiviso e utilizzato tra progetti e processi. Per ulteriori informazioni, consulta File di configurazione.
Utilizzate il RemoteExecutor
API per richiamare una funzione
È possibile utilizzare il RemoteExecutor
API per richiamare una funzione. SageMaker Python SDK trasformerà il codice contenuto nella RemoteExecutor
chiamata in un processo di SageMaker formazione. Il processo di addestramento richiamerà quindi la funzione come operazione asincrona e restituirà un futuro. Se si utilizza il RemoteExecutor
API, è possibile eseguire più di un processo di formazione in parallelo. Per ulteriori informazioni sugli oggetti futuri in Python, consulta Futures
Il seguente esempio di codice mostra come importare le librerie richieste, definire una funzione, avviare un' SageMaker istanza e utilizzarle API per inviare una richiesta per eseguire 2
lavori in parallelo.
from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="
ml.m5.large
") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()
La classe RemoteExecutor
è un'implementazione della libreria concurrent.futures.Executor
Il seguente esempio di codice mostra come definire una funzione e richiamarla utilizzando RemoteExecutorAPI
. In questo esempio, RemoteExecutor
invierà 4
processi in totale, ma solo 2
in parallelo. Gli ultimi due processi riutilizzeranno i cluster con un sovraccarico minimo.
from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())
Il parametro max_parallel_job
funge solo da meccanismo di limitazione della velocità senza ottimizzare l'allocazione delle risorse di calcolo. Nell'esempio di codice precedente, RemoteExecutor
non riserva risorse di calcolo per i due processi paralleli prima dell'invio di qualsiasi processo. Per ulteriori informazioni su max_parallel_job
o altri parametri per il decoratore @remote, consulta Remote function classes and methods specification
Classe futura per RemoteExecutor
API
Una classe futura è una classe pubblica che rappresenta la funzione di ritorno dal processo di addestramento quando viene richiamata in modo asincrono. La classe futura implementa la classe concurrent.futures.Future