As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Modificar um script TensorFlow de treinamento
Nesta seção, você aprende a modificar scripts de TensorFlow treinamento para configurar a biblioteca de paralelismo de SageMaker modelos para particionamento automático e particionamento manual. Essa seleção de exemplos também inclui um exemplo integrado ao Horovod para modelo híbrido e paralelismo de dados.
nota
Para descobrir quais TensorFlow versões são suportadas pela biblioteca, consulteFrameworks compatíveis e Regiões da AWS.
As modificações necessárias que você deve fazer em seu script de treinamento para usar a biblioteca estão listadas em Divisão automatizada com TensorFlow.
Para saber como modificar seu script de treinamento para usar o modelo híbrido e o paralelismo de dados com o Horovod, consulte Divisão automatizada com TensorFlow e Horovod para modelo híbrido e paralelismo de dados.
Se você quiser usar o particionamento manual, revise também Divisão manual com TensorFlow.
Os tópicos a seguir mostram exemplos de scripts de treinamento que você pode usar para configurar a biblioteca de paralelismo SageMaker de modelos da para particionamento automático e modelos de particionamento manual. TensorFlow
nota
O particionamento automático está habilitado por padrão. A menos que especificado de outra forma, os scripts de exemplo usam particionamento automático.
Tópicos
Divisão automatizada com TensorFlow
As seguintes alterações no script de treinamento são necessárias para executar um TensorFlow modelo com a biblioteca SageMaker de paralelismo de modelos:
-
Importe e inicialize a biblioteca com o
smp.init()
. -
Defina um modelo Keras herdando da classe Keras Model
smp.DistributedModel
em vez da classe Keras Model. Retorne as saídas do modelo do método de chamada do objeto smp.DistributedModel
. Esteja ciente de que qualquer tensor retornado do método de chamada será transmitido para dispositivos de paralelismo de modelo, acarretando custos de comunicação. Portanto, quaisquer tensores que não são necessários fora do método de chamada (como ativações intermediárias) não devem ser retornados. -
Defina
drop_remainder=True
no métodotf.Dataset.batch()
. Isso é para garantir que o tamanho do lote seja sempre divisível pelo número de microlotes. -
Semeie as operações aleatórias no data pipeline usando o
smp.dp_rank()
, por exemplo,shuffle(ds, seed=smp.dp_rank())
para garantir a consistência das amostras de dados em GPUs que contêm diferentes partições de modelo. -
Coloque a lógica para frente e para trás em uma step function e decore-a com
smp.step
. -
Execute o pós-processamento nas saídas em microlotes usando métodos
StepOutput
como reduce_mean
. A função dosmp.step
deve ter um valor de retorno que dependa da saída de smp.DistributedModel
. -
Se houver uma etapa de avaliação, coloque logicamente a frente (forward) dentro de uma função decorada com
smp.step
e processe os resultados usando a API doStepOutput
.
O script Python a seguir é um exemplo de script de treinamento após as alterações serem feitas.
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Se você terminar de preparar seu roteiro de treinamento, prossiga para Etapa 2: Iniciar um Training Job usando o SageMaker Python SDK. Se quiser executar um modelo híbrido e um trabalho de treinamento paralelo de dados, siga para a próxima seção.
Divisão automatizada com TensorFlow e Horovod para modelo híbrido e paralelismo de dados
Você pode usar a biblioteca de paralelismo de SageMaker modelos com o Horovod para modelos híbridos e paralelismo de dados. Para ler mais sobre como a biblioteca divide um modelo para paralelismo híbrido, consulte Paralelismo de tubulação (disponível para e) PyTorch TensorFlow.
Nesta etapa, vamos nos concentrar em como modificar seu script de treinamento para adaptar a biblioteca de paralelismo de SageMaker modelos.
Para configurar adequadamente seu script de treinamento para adotar a configuração de paralelismo híbrido que você definirá em Etapa 2: Iniciar um Training Job usando o SageMaker Python SDK, utilize as funções auxiliares da biblioteca, smp.dp_rank()
e smp.mp_rank()
, que detectam automaticamente o rank de paralelismo de dados e o rank de paralelismo de modelo, respectivamente.
Para encontrar todas as primitivas de MPI suportadas pela biblioteca, consulte Noções básicas de MPI na
As mudanças necessárias no script são:
-
Adicionando
hvd.allreduce
-
Variáveis de transmissão após o primeiro lote, conforme exigido pela Horovod
-
Disseminando operações de embaralhamento e/ou fragmentação no data pipeline com
smp.dp_rank()
.
nota
Ao usar o Horovod, você não deve solicitar diretamente hvd.init
no seu script de treinamento. Em vez disso, você precisará "horovod"
definir como True
nos parâmetros do SDK modelparallel
do SageMaker Python em. Etapa 2: Iniciar um Training Job usando o SageMaker Python SDK Isso permite que a biblioteca inicialize internamente o Horovod com base nas atribuições de dispositivos das partições do modelo. Chamar hvd.init()
diretamente em seu script de treinamento pode causar problemas.
nota
Usar a API do hvd.DistributedOptimizer
diretamente em seu script de treinamento pode resultar em performance e velocidade de treinamento ruins, porque a API coloca implicitamente a operação AllReduce
dentro do smp.step
. Recomendamos que você use a biblioteca de paralelismo de modelos com o Horovod chamando diretamente hvd.allreduce
após a chamada accumulate()
ou reduce_mean()
nos gradientes retornados smp.step
, conforme mostrado no exemplo a seguir.
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
Divisão manual com TensorFlow
Use gerenciadores de contexto do smp.partition
para colocar as operações em uma partição específica. Qualquer operação não colocada em nenhum contexto smp.partition
é colocada no default_partition
. Para saber mais sobre a API SageMaker da biblioteca de paralelismo de modelos, consulte a documentação da API.
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Recursos de framework incompatíveis
Os seguintes TensorFlow recursos não são compatíveis com a biblioteca:
-
O
tf.GradientTape()
não tem suporte no momento. Você pode usarOptimizer.get_gradients()
ouOptimizer.compute_gradients()
em vez disso para calcular gradientes. -
Atualmente, a API do
tf.train.Checkpoint.restore()
não tem suporte. Para pontos de verificação, usesmp.CheckpointManager
em vez disso, que fornece a mesma API e funcionalidade. Observe que as restaurações do ponto de verificação dosmp.CheckpointManager
devem ocorrer após a primeira etapa.