Modificar un script de entrenamiento de PyTorch - Amazon SageMaker

Modificar un script de entrenamiento de PyTorch

En esta sección, aprenderá a modificar los scripts de entrenamiento de PyTorch para configurar la biblioteca de paralelismo de modelos de SageMaker para la partición automática y la partición manual.

nota

Para saber qué versiones de PyTorch son compatibles con la biblioteca, consulte Marcos admitidos y Regiones de AWS.

sugerencia

Para obtener ejemplos de cuadernos integrables que muestran cómo utilizar un script de entrenamiento de PyTorch con la biblioteca de paralelismo de modelos de SageMaker, consulte Ejemplos de la biblioteca de paralelismo de modelos de Amazon SageMaker v1.

Tenga en cuenta que la partición automática está habilitada de forma predeterminada. A menos que se especifique lo contrario, los siguientes scripts utilizan la partición automática.

División automatizada con PyTorch

Se requieren los siguientes cambios en el script de entrenamiento para ejecutar un script de entrenamiento de PyTorch con la biblioteca de paralelismo de modelos de SageMaker:

  1. Importe e inicialice la biblioteca con smdistributed.modelparallel.torch.init().

  2. Encapsule el modelo con smdistributed.modelparallel.torch.DistributedModel. Tenga en cuenta que los tensores devueltos por el método forward del objeto subyacente se transmitirán a través de dispositivos paralelos al modelo, lo que generará una sobrecarga de comunicación, por lo que no se deben devolver los tensores que no sean necesarios fuera del método de llamada (como activaciones intermedias).

    nota

    Para el entrenamiento del FP16, debe usar el administrador de contexto smdistributed.modelparallel.torch.model_creation() para encapsular el modelo. Para obtener más información, consulte Entrenamiento con el FP16 con paralelismo de modelos.

  3. Encapsule el optimizador con smdistributed.modelparallel.torch.DistributedOptimizer.

    nota

    Para el entrenamiento de FP16, debe configurar el escalado de pérdida estático o dinámico. Para obtener más información, consulte Entrenamiento con el FP16 con paralelismo de modelos.

  4. Usa el objeto DistributedModel devuelto en lugar de un modelo de usuario.

  5. Coloque la lógica hacia adelante y hacia atrás en una función de paso y decórela con smdistributed.modelparallel.torch.step.

  6. Restrinja cada proceso a su propio dispositivo mediante torch.cuda.set_device(smp.local_rank()).

  7. Mueva los tensores de entrada a la GPU mediante la API .to() antes de la llamada smp.step (véase el ejemplo a continuación).

  8. Sustituya torch.Tensor.backward y torch.autograd.backward por DistributedModel.backward.

  9. Realice un procesamiento posterior en las salidas de los microlotes mediante los métodos StepOutput como reduce_mean.

  10. Si hay algún paso de evaluación, coloque de manera similar la lógica de avance dentro de la función decorada smp.step y procese posteriormente las salidas utilizando la API StepOutput.

  11. Establezca drop_last=True en DataLoader. Alternativamente, omita manualmente un lote en el ciclo de entrenamiento si el tamaño del lote no es divisible por el número de microlotes.

Para obtener más información sobre la API de biblioteca de paralelismo de modelos de SageMaker, consulte la Documentación API.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

División manual con PyTorch

Utilice los gestores de contexto de smp.partition para colocar módulos en dispositivos específicos. Todo módulo no colocado en ningún contexto smp.partition se colocará en default_partition. El default_partition debe proporcionarse si auto_partition está establecido en False. Los módulos que se crean dentro de un determinado contexto smp.partition se colocan en la partición correspondiente.

Para obtener más información sobre la API de biblioteca de paralelismo de modelos de SageMaker, consulte la Documentación API.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

Consideraciones

Al configurar un script de entrenamiento de PyTorch mediante la biblioteca de paralelismo de modelos de SageMaker, debe tener en cuenta lo siguiente:

  • Si utiliza una técnica de optimización que se basa en normas de degradado globales, por ejemplo, una norma de degradado de todo el modelo, como algunas variantes del optimizador LAMB o el recorte de degradado global, debe recopilar todas las normas en las particiones del modelo para que sean correctas. Puede utilizar los tipos de datos básicos de comunicación de la biblioteca para hacerlo.

  • Todos los argumentos torch.Tensor a los métodos de reenvío del modelo nn.Modules deben utilizarse en el cálculo de la salida del módulo. En otras palabras, la biblioteca no admite ese caso en el que hay un argumento torch.Tensor a un módulo del que no depende la salida del módulo.

  • El argumento a la llamada smp.DistributedModel.backward() debe depender de todas las salidas del modelo. En otras palabras, no puede haber una salida de la llamada smp.DistributedModel.forward que no se utilice en el cálculo del tensor que se introduce en la llamada smp.DistributedModel.backward.

  • Si hay llamadas torch.cuda.synchronize() en su código, es posible que tenga que llamar torch.cuda.set_device(smp.local_rank()) inmediatamente antes de la llamada de sincronización. De lo contrario, se podrían crear contextos CUDA innecesarios en el dispositivo 0, que consumirá memoria innecesariamente.

  • Desde que la biblioteca coloca nn.Modules en distintos dispositivos, los módulos del modelo no deben depender de ningún estado global modificado en smp.step. Cualquier estado que permanezca fijo durante todo el entrenamiento o que se modifique fuera de smp.step de forma visible para todos los procesos, está permitido.

  • No es necesario mover el modelo a la GPU (por ejemplo, usando model.to(device)) al utilizar la biblioteca. Si intenta mover el modelo a la GPU antes de particionar el modelo (antes de la primera llamada smp.step), se ignora la llamada de movimiento. La biblioteca mueve automáticamente la parte del modelo asignada a un rango a su GPU. Una vez que comience el entrenamiento con la biblioteca, no mueva el modelo a la CPU y lo utilice, ya que no tendrá parámetros correctos para los módulos no asignados a la partición que tiene el proceso. Si desea volver a entrenar un modelo o utilizarlo como inferencia sin la biblioteca después de haber sido entrenado con la biblioteca de paralelismo de modelos, la forma recomendada es guardar el modelo completo utilizando nuestra API de puntos de control y cargarlo de nuevo en un módulo PyTorch normal.

  • Si tiene una lista de módulos de modo que la salida de uno alimenta a otro, reemplazar esa lista por nn.Sequential puede mejorar significativamente el rendimiento.

  • La actualización de peso (optimizer.step()) tiene que ocurrir fuera de smp.stepporque es cuando se hace todo el paso hacia atrás y los gradientes están listos. Cuando se utiliza un modelo híbrido con paralelismo de modelo y datos, en este punto también se garantiza que AllReduce de gradientes finalizará.

  • Cuando utilice la biblioteca en combinación con el paralelismo de datos, asegúrese de que la cantidad de lotes en todos los rangos paralelos de datos sea la misma para que AllReduce no se bloquee esperando un rango que no participa en el paso.

  • Si lanza un trabajo de entrenamiento utilizando un tipo de instancia ml.p4d (como ml.p4d.24xlarge), debe establecer la variable del cargador de datos num_workers=0. Por ejemplo, puede definir su DataLoader de la siguiente manera:

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • Las entradas para smp.step deben ser las entradas de modelo generadas por DataLoader. Esto se debe a que smp.step divide internamente los tensores de entrada a lo largo de la dimensión del lote y los canaliza. Esto significa que pasar DataLoader a la smp.step para generar las entradas del modelo en el interior no funciona.

    Por ejemplo, si define un DataLoader de la siguiente manera:

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    Debe acceder a las entradas del modelo generadas por train_loader y pasarlos a una función decorada por smp.step. No pase train_loader directamente a la función smp.step.

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • Los tensores de entrada a smp.step debe, moverse al dispositivo actual mediante la API .to(), que debe tener lugar después de la llamada torch.cuda.set_device(local_rank()).

    Por ejemplo, puede utilizar la función train de la siguiente manera. Esta función añade data y target al dispositivo actual utilizando la API .to() antes de usar esos tensores de entrada para llamar train_step.

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    Los tensores de entrada para esta función decorado por smp.set se ha movido al dispositivo actual en la función train anterior. El modelo no debe moverse al dispositivo actual. La biblioteca mueve automáticamente la parte del modelo asignada a un rango a su GPU.

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

Características del marco no compatibles

La biblioteca de paralelismo de modelos de SageMaker no admite las siguientes funciones de PyTorch:

  • Si utiliza paralelismo de datos con el DDP de PyTorch nativo, el módulo encapsulador torch.nn.parallel.DistributedDataParallel no es compatible con la biblioteca. La biblioteca administra internamente la integración con el DDP de PyTorch, incluidos la difusión de parámetros y el gradiente AllReduce. Cuando se utiliza la biblioteca, los búferes de módulo solo se transmiten una vez al comienzo del entrenamiento. Si el modelo tiene búferes de módulo que deben sincronizarse entre los grupos paralelos de datos en cada paso, puede hacerlo a través de la API torch.distributed, utilizando el grupo de procesos que se puede obtener mediante smp.get_dp_process_group().

  • Para un entrenamiento de precisión mixta, el módulo apex.amp no es compatible. La forma recomendada de utilizar la biblioteca con precisión mixta automática es utilizar torch.cuda.amp, con la excepción de utilizar smp.amp.GradScaler en lugar de la aplicación en antorcha.

  • torch.jit.ScriptModules o ScriptFunctions no son compatibles con smp.DistributedModel.

  • apex : FusedLayerNorm, FusedAdam, FusedLAMB y FusedNovoGrad de apex no son compatibles. Puede utilizar las implementaciones de la biblioteca de estos mediante las API smp.optimizers y smp.nn en su lugar.