Modifier un script TensorFlow d'entraînement - Amazon SageMaker

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Modifier un script TensorFlow d'entraînement

Dans cette section, vous apprendrez à modifier les scripts d' TensorFlow apprentissage afin de configurer la bibliothèque de parallélisme des SageMaker modèles pour le partitionnement automatique et le partitionnement manuel. Cette sélection d'exemples inclut également un exemple intégré à Horovod pour le modèle hybride et le parallélisme des données.

Note

Pour connaître les TensorFlow versions prises en charge par la bibliothèque, consultezCadres pris en et Régions AWS.

Les modifications que vous devez apporter à votre script d'entraînement pour utiliser la bibliothèque sont répertoriées dans Fractionnement automatique avec TensorFlow.

Pour savoir comment modifier votre script d'entraînement pour utiliser un modèle hybride et le parallélisme de données avec Horovod, consultez Division automatisée avec Horovod TensorFlow et Horovod pour le parallélisme des modèles hybrides et des données.

Si vous optez pour le partitionnement manuel, consultez également Découpage manuel avec TensorFlow.

Les rubriques suivantes présentent des exemples de scripts de formation que vous pouvez utiliser pour configurer la bibliothèque SageMaker de parallélisme des modèles pour le partitionnement automatique et les modèles de partitionnement manuel. TensorFlow

Note

Le partitionnement automatique est activé par défaut. Sauf indication contraire, les exemples de scripts utilisent le partitionnement automatique.

Fractionnement automatique avec TensorFlow

Les modifications de script d'entraînement suivantes sont nécessaires pour exécuter un TensorFlow modèle avec SageMaker la bibliothèque de parallélisme des modèles :

  1. Importez et initialisez la bibliothèque avec smp.init().

  2. Définissez un modèle Keras en héritant de smp.DistributedModel au lieu de la classe de modèles Keras. Renvoyez les sorties du modèle à partir de la méthode d'appel de l'objet smp.DistributedModel. N'oubliez pas que tous les tenseurs renvoyés par la méthode d'appel seront diffusés sur des périphériques avec parallélisme des modèles. Comme cela induira un surdébit de communication, évitez de renvoyer les tenseurs qui ne sont pas nécessaires en dehors de la méthode d'appel (activations intermédiaires, par exemple).

  3. Définissez drop_remainder=True dans la méthode tf.Dataset.batch(). Cela vise à garantir que la taille du lot est toujours divisible par le nombre de micro-lots.

  4. Répartissez les opérations aléatoires dans le pipeline de données en utilisant un smp.dp_rank(), par exemple shuffle(ds, seed=smp.dp_rank()), pour assurer la cohérence des échantillons de données entre les GPU contenant différentes partitions de modèle.

  5. Mettez la logique en avant et en arrière dans une fonction étape et décorez-la avec smp.step.

  6. Effectuez un post-traitement sur les sorties des différents micro-lots à l'aide de méthodes StepOutput telles que reduce_mean. La fonction smp.step doit avoir une valeur de retour qui dépend de la sortie de smp.DistributedModel.

  7. De façon similaire, s'il y a une étape d'évaluation, placez la logique en avant dans une fonction décorée smp.step et post-traitez les sorties en utilisant l'API StepOutput.

Pour en savoir plus sur l'API SageMaker de la bibliothèque de parallélisme des modèles, consultez la documentation de l'API.

Le script Python suivant est un exemple de script d'entraînement après application des modifications.

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Si vous avez fini de préparer votre scénario d'entraînement, passez à Étape 2 : Lancer un job de formation à l'aide du SDK SageMaker Python. Si vous souhaitez exécuter une tâche d'entraînement parallèle modèle et données hybride, passez à la section suivante.

Division automatisée avec Horovod TensorFlow et Horovod pour le parallélisme des modèles hybrides et des données

Vous pouvez utiliser la bibliothèque de parallélisme de SageMaker modèles avec Horovod pour le parallélisme de modèles hybrides et de données. Pour en savoir plus sur la façon dont la bibliothèque divise un modèle pour le parallélisme hybride, reportez-vous à Parallélisme du pipeline (disponible pour PyTorch et) TensorFlow.

Dans cette étape, nous nous concentrons sur la manière de modifier votre script d'entraînement afin d'adapter la bibliothèque de parallélisme du SageMaker modèle.

Pour configurer correctement votre script d'entraînement afin qu'il prenne en compte la configuration du parallélisme hybride que vous définirez dans Étape 2 : Lancer un job de formation à l'aide du SDK SageMaker Python, utilisez les fonctions d'aide de la bibliothèque, smp.dp_rank() et smp.mp_rank(), qui détectent automatiquement le rang parallèle des données et le rang parallèle du modèle, respectivement.

Pour trouver toutes les primitives MPI prises en charge par la bibliothèque, consultez les bases du MPI dans la documentation du SDK SageMaker Python.

Les modifications à apporter au script sont les suivantes :

  • Ajouter hvd.allreduce

  • Diffuser des variables après le premier lot, comme l'exige Horovod

  • Répartir des opérations de remaniement et/ou de partitionnement dans le pipeline de données avec smp.dp_rank().

Note

Lorsque vous utilisez Horovod, vous ne devez pas faire appel directement à hvd.init dans votre script d'entraînement. Au lieu de cela, vous devrez le "horovod" définir True dans les modelparallel paramètres du SDK SageMaker Python dansÉtape 2 : Lancer un job de formation à l'aide du SDK SageMaker Python. Cela permet à la bibliothèque d'initialiser Horovod en interne en se basant sur les affectations de périphériques des partitions du modèle. Le fait d'appeler directement hvd.init() dans votre script d'entraînement peut poser des problèmes.

Note

L'utilisation de l'API hvd.DistributedOptimizer directement dans votre script d'entraînement peut entraîner une baisse des performances et de la vitesse d'entraînement, car l'API place implicitement l'opération AllReduce à l'intérieur de smp.step. Nous vous recommandons d'utiliser la bibliothèque de parallélisme de modèles avec Horovod en appelant directement hvd.allreduce après l'appel à accumulate() ou à reduce_mean() sur les gradients retournés par smp.step, comme le montre l'exemple suivant.

Pour en savoir plus sur l'API SageMaker de la bibliothèque de parallélisme des modèles, consultez la documentation de l'API.

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

Découpage manuel avec TensorFlow

Utilisez les gestionnaires de contexte smp.partition pour placer les opérations dans une partition spécifique. Toute opération non placée dans un contexte smp.partition est placée dans le default_partition. Pour en savoir plus sur l'API SageMaker de la bibliothèque de parallélisme des modèles, consultez la documentation de l'API.

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Fonctionnalités de framework non prises en charge

Les TensorFlow fonctionnalités suivantes ne sont pas prises en charge par la bibliothèque :

  • tf.GradientTape() n'est pas prise en charge pour le moment. À la place, vous pouvez utiliser Optimizer.get_gradients() ou Optimizer.compute_gradients() pour calculer les gradients.

  • L'API tf.train.Checkpoint.restore() n'est pas prise en charge pour le moment. Pour le pointage, utilisez smp.CheckpointManager, qui fournit la même API et la même fonctionnalité. Les restaurations de point de contrôle avec smp.CheckpointManager doivent intervenir après la première étape.