Modificar um script TensorFlow de treinamento - Amazon SageMaker

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Modificar um script TensorFlow de treinamento

Nesta seção, você aprende a modificar scripts de TensorFlow treinamento para configurar a biblioteca de paralelismo de SageMaker modelos para particionamento automático e particionamento manual. Essa seleção de exemplos também inclui um exemplo integrado ao Horovod para modelo híbrido e paralelismo de dados.

nota

Para descobrir quais TensorFlow versões são suportadas pela biblioteca, consulteFrameworks compatíveis e Regiões da AWS.

As modificações necessárias que você deve fazer em seu script de treinamento para usar a biblioteca estão listadas em Divisão automatizada com TensorFlow.

Para saber como modificar seu script de treinamento para usar o modelo híbrido e o paralelismo de dados com o Horovod, consulte Divisão automatizada com TensorFlow e Horovod para modelo híbrido e paralelismo de dados.

Se você quiser usar o particionamento manual, revise também Divisão manual com TensorFlow.

Os tópicos a seguir mostram exemplos de scripts de treinamento que você pode usar para configurar a biblioteca de paralelismo SageMaker de modelos da para particionamento automático e modelos de particionamento manual. TensorFlow

nota

O particionamento automático está habilitado por padrão. A menos que especificado de outra forma, os scripts de exemplo usam particionamento automático.

Divisão automatizada com TensorFlow

As seguintes alterações no script de treinamento são necessárias para executar um TensorFlow modelo com a biblioteca SageMaker de paralelismo de modelos:

  1. Importe e inicialize a biblioteca com o smp.init().

  2. Defina um modelo Keras herdando da classe Keras Model smp.DistributedModel em vez da classe Keras Model. Retorne as saídas do modelo do método de chamada do objeto smp.DistributedModel. Esteja ciente de que qualquer tensor retornado do método de chamada será transmitido para dispositivos de paralelismo de modelo, acarretando custos de comunicação. Portanto, quaisquer tensores que não são necessários fora do método de chamada (como ativações intermediárias) não devem ser retornados.

  3. Defina drop_remainder=True no método tf.Dataset.batch(). Isso é para garantir que o tamanho do lote seja sempre divisível pelo número de microlotes.

  4. Semeie as operações aleatórias no data pipeline usando o smp.dp_rank(), por exemplo, shuffle(ds, seed=smp.dp_rank()) para garantir a consistência das amostras de dados em GPUs que contêm diferentes partições de modelo.

  5. Coloque a lógica para frente e para trás em uma step function e decore-a com smp.step.

  6. Execute o pós-processamento nas saídas em microlotes usando métodos StepOutput como reduce_mean. A função do smp.step deve ter um valor de retorno que dependa da saída de smp.DistributedModel.

  7. Se houver uma etapa de avaliação, coloque logicamente a frente (forward) dentro de uma função decorada com smp.step e processe os resultados usando a API do StepOutput.

Para saber mais sobre a API SageMaker da biblioteca de paralelismo de modelos, consulte a documentação da API.

O script Python a seguir é um exemplo de script de treinamento após as alterações serem feitas.

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Se você terminar de preparar seu roteiro de treinamento, prossiga para Etapa 2: Iniciar um Training Job usando o SageMaker Python SDK. Se quiser executar um modelo híbrido e um trabalho de treinamento paralelo de dados, siga para a próxima seção.

Divisão automatizada com TensorFlow e Horovod para modelo híbrido e paralelismo de dados

Você pode usar a biblioteca de paralelismo de SageMaker modelos com o Horovod para modelos híbridos e paralelismo de dados. Para ler mais sobre como a biblioteca divide um modelo para paralelismo híbrido, consulte Paralelismo de tubulação (disponível para e) PyTorch TensorFlow.

Nesta etapa, vamos nos concentrar em como modificar seu script de treinamento para adaptar a biblioteca de paralelismo de SageMaker modelos.

Para configurar adequadamente seu script de treinamento para adotar a configuração de paralelismo híbrido que você definirá em Etapa 2: Iniciar um Training Job usando o SageMaker Python SDK, utilize as funções auxiliares da biblioteca, smp.dp_rank() e smp.mp_rank(), que detectam automaticamente o rank de paralelismo de dados e o rank de paralelismo de modelo, respectivamente.

Para encontrar todas as primitivas de MPI suportadas pela biblioteca, consulte Noções básicas de MPI na documentação do SDK para Python SageMaker .

As mudanças necessárias no script são:

  • Adicionando hvd.allreduce

  • Variáveis de transmissão após o primeiro lote, conforme exigido pela Horovod

  • Disseminando operações de embaralhamento e/ou fragmentação no data pipeline com smp.dp_rank().

nota

Ao usar o Horovod, você não deve solicitar diretamente hvd.init no seu script de treinamento. Em vez disso, você precisará "horovod" definir como True nos parâmetros do SDK modelparallel do SageMaker Python em. Etapa 2: Iniciar um Training Job usando o SageMaker Python SDK Isso permite que a biblioteca inicialize internamente o Horovod com base nas atribuições de dispositivos das partições do modelo. Chamar hvd.init() diretamente em seu script de treinamento pode causar problemas.

nota

Usar a API do hvd.DistributedOptimizer diretamente em seu script de treinamento pode resultar em performance e velocidade de treinamento ruins, porque a API coloca implicitamente a operação AllReduce dentro do smp.step. Recomendamos que você use a biblioteca de paralelismo de modelos com o Horovod chamando diretamente hvd.allreduce após a chamada accumulate() ou reduce_mean() nos gradientes retornados smp.step, conforme mostrado no exemplo a seguir.

Para saber mais sobre a API SageMaker da biblioteca de paralelismo de modelos, consulte a documentação da API.

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

Divisão manual com TensorFlow

Use gerenciadores de contexto do smp.partition para colocar as operações em uma partição específica. Qualquer operação não colocada em nenhum contexto smp.partition é colocada no default_partition. Para saber mais sobre a API SageMaker da biblioteca de paralelismo de modelos, consulte a documentação da API.

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Recursos de framework incompatíveis

Os seguintes TensorFlow recursos não são compatíveis com a biblioteca:

  • O tf.GradientTape() não tem suporte no momento. Você pode usar Optimizer.get_gradients() ou Optimizer.compute_gradients() em vez disso para calcular gradientes.

  • Atualmente, a API do tf.train.Checkpoint.restore() não tem suporte. Para pontos de verificação, use smp.CheckpointManager em vez disso, que fornece a mesma API e funcionalidade. Observe que as restaurações do ponto de verificação do smp.CheckpointManager devem ocorrer após a primeira etapa.