Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Modificare uno script di addestramento TensorFlow
In questa sezione, imparerai come modificare gli script di TensorFlow addestramento per configurare la libreria di parallelismo dei SageMaker modelli per il partizionamento automatico e il partizionamento manuale. Gli esempi scelti comprendono anche un esempio integrato con Horovod per il modello ibrido e il parallelismo dei dati.
Nota
Per scoprire quali TensorFlow versioni sono supportate dalla libreria, consulta. Framework e Regioni AWS supportati
Le modifiche necessarie da apportare allo script di addestramento per utilizzare la libreria sono elencate in Divisione automatica con TensorFlow.
Per maggiori informazioni su come modificare lo script di addestramento per utilizzare il modello ibrido e il parallelismo dei dati con Horovod, consulta Suddivisione automatizzata con TensorFlow e Horovod per modelli ibridi e parallelismo dei dati.
Se desideri utilizzare il partizionamento manuale, consulta anche Divisione manuale con TensorFlow.
I seguenti argomenti mostrano esempi di script di addestramento che è possibile utilizzare per configurare la libreria di parallelismo SageMaker dei modelli per il partizionamento automatico e i modelli di partizionamento manuale. TensorFlow
Nota
Il partizionamento automatico è abilitato come impostazione predefinita. Se non diversamente specificato, gli script di esempio utilizzano il partizionamento automatico.
Argomenti
Divisione automatica con TensorFlow
Le seguenti modifiche allo script di addestramento sono necessarie per eseguire un TensorFlow modello con la libreria SageMaker di parallelismo dei modelli:
-
Importa e inizializza la libreria con
smp.init()
. -
Definisci un modello Keras ereditandolo da
smp.DistributedModel
anziché dalla classe di modelli Keras. Restituisci gli output del modello dal metodo di chiamata dell'oggetto smp.DistributedModel
. Tieni presente che tutti i tensori restituiti dal metodo di chiamata verranno trasmessi su dispositivi paralleli al modello, con un sovraccarico di comunicazione, quindi non dovrebbe essere restituito alcun tensore che non è necessario al di fuori del metodo di chiamata (come le attivazioni intermedie) . -
Imposta
drop_remainder=True
nel metodotf.Dataset.batch()
. Ciò serve a garantire che la dimensione del batch sia sempre divisibile per il numero di microbatch. -
Fornisci i seed alle operazioni casuali nella pipeline di dati utilizzando
smp.dp_rank()
, ad esempio,shuffle(ds, seed=smp.dp_rank())
per garantire la coerenza dei campioni di dati tra le GPU che contengono partizioni di modelli diversi. -
Inserisci la logica forward e backward in una funzione a fasi e decorala con
smp.step
. -
Esegui la post-elaborazione sugli output tra i microbatch utilizzando metodi
StepOutput
come reduce_mean
. La funzionesmp.step
deve restituire un valore che dipende dall'output di smp.DistributedModel
. -
Se è presente una fase di valutazione, inserisci in modo analogo la logica forward all'interno di una funzione decorata
smp.step
e post-elabora gli output utilizzando l'APIStepOutput
.
Il seguente script Python è un esempio di script di addestramento dopo l'introduzione delle modifiche.
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Se hai finito di preparare lo script di addestramento, procedi con Fase 2: Avviare un job di formazione utilizzando SageMaker Python SDK. Se desideri eseguire un modello ibrido e un processo di addestramento per il parallelismo dei dati, continua con la sezione successiva.
Suddivisione automatizzata con TensorFlow e Horovod per modelli ibridi e parallelismo dei dati
Puoi utilizzare la libreria di parallelismo dei SageMaker modelli con Horovod per il parallelismo ibrido di modelli e dati. Per saperne di più su come la libreria suddivide un modello per il parallelismo ibrido, consulta PyTorch TensorFlowParallelismo delle pipeline (disponibile per e).
In questo passaggio, ci concentriamo su come modificare lo script di addestramento per adattare la libreria di parallelismo dei modelli. SageMaker
Per configurare correttamente il proprio script di addestramento per acquisire la configurazione di parallelismo ibrido che sarà impostata in Fase 2: Avviare un job di formazione utilizzando SageMaker Python SDK, usa le funzioni di supporto della libreria, smp.dp_rank()
e smp.mp_rank()
, che rilevano automaticamente rispettivamente la classificazione parallela dei dati e la classificazione parallela dei modelli.
Per trovare tutte le primitive MPI supportate dalla libreria, consulta MPI Basics
Le modifiche richieste nello script sono:
-
Aggiunta di
hvd.allreduce
-
Variabili di trasmissione dopo il primo batch, come richiesto da Horovod
-
Operazioni di partizionamento e/o shuffling dei seed nella pipeline di dati con
smp.dp_rank()
.
Nota
Quando usi Horovod, non devi chiamare hvd.init
direttamente nel tuo script di addestramento. Dovrai invece "horovod"
impostarlo True
nei parametri dell'SDK modelparallel
di SageMaker Python in. Fase 2: Avviare un job di formazione utilizzando SageMaker Python SDK Ciò consente alla libreria di inizializzare internamente Horovod in base alle assegnazioni dei dispositivi delle partizioni del modello. La chiamata di hvd.init()
direttamente nello script di addestramento può causare problemi.
Nota
L'utilizzo dell'API hvd.DistributedOptimizer
direttamente nello script di addestramento potrebbe comportare velocità e prestazioni di addestramento scadenti, poiché l'API inserisce implicitamente l'operazione AllReduce
all'interno di smp.step
. Ti consigliamo di utilizzare la libreria di parallelismo dei modelli con Horovod chiamando direttamente hvd.allreduce
dopo aver chiamato accumulate()
o reduce_mean()
sui gradienti restituiti da smp.step
, come verrà mostrato nell'esempio seguente.
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
Divisione manuale con TensorFlow
Usa i gestori di contesto smp.partition
per inserire le operazioni in una partizione specifica. Qualsiasi operazione non inserita in alcun contesto smp.partition
viene inserita in default_partition
. Per ulteriori informazioni sull'API della libreria SageMaker di parallelismo dei modelli, consulta la documentazione dell'API.
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Funzionalità del framework non supportate
Le seguenti TensorFlow funzionalità non sono supportate dalla libreria:
-
tf.GradientTape()
non è attualmente supportato. Puoi invece usareOptimizer.get_gradients()
oOptimizer.compute_gradients()
per calcolare i gradienti. -
L'API
tf.train.Checkpoint.restore()
non è attualmente supportata. Per il checkpoint, usa invecesmp.CheckpointManager
, che fornisce la stessa API e funzionalità. Tieni presente che il ripristino dei checkpoint consmp.CheckpointManager
dovrebbe avvenire dopo la prima fase.