修改 PyTorch 訓練指令碼 - Amazon SageMaker AI

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

修改 PyTorch 訓練指令碼

在本節中,您將了解如何修改 PyTorch 訓練指令碼,以設定 SageMaker 模型平行處理程式庫以進行自動分割和手動分割。

注意

若要尋找程式庫支援的 PyTorch 版本,請參閱 支援的架構與 AWS 區域

提示

如需 end-to-end示範如何搭配 SageMaker 模型平行處理程式庫使用 PyTorch 訓練指令碼的筆記本範例,請參閱 Amazon SageMaker AI 模型平行處理程式庫 v1 範例

請注意,自動磁碟分割預設為啟用。除非特別指定,否則下列指令碼都採用自動分割。

使用 自動分割 PyTorch

執行具有模型平行處理程式庫 SageMaker的訓練指令碼需要下列 PyTorch 訓練指令碼變更:

  1. 使用 smdistributed.modelparallel.torch.init() 匯入和初始化程式庫。

  2. smdistributed.modelparallel.torch.DistributedModel 包裝模型。請注意,從基礎 nn.Module 物件的 forward 方法傳回的任何張量,都將跨模型平行裝置廣播,造成通訊開銷增加,因此不需傳回在呼叫方法之外的非必要張量 (例如中繼啟動)。

    注意

    針對FP16訓練,您需要使用 smdistributed.modelparallel.torch.model_creation() 內容管理員來包裝模型。如需詳細資訊,請參閱FP16 使用模型平行性進行訓練

  3. smdistributed.modelparallel.torch.DistributedOptimizer 包裝最佳化工具。

    注意

    針對FP16訓練,您需要設定靜態或動態損失擴展。如需詳細資訊,請參閱FP16 使用模型平行性進行訓練

  4. 使用傳回的 DistributedModel 物件,而非使用者模型。

  5. 將轉送和向後邏輯放在 Step Function 中,並使用 smdistributed.modelparallel.torch.step 進行裝飾。

  6. 透過 torch.cuda.set_device(smp.local_rank()) 將每個程序限制在所屬裝置中。

  7. smp.step呼叫.to()API之前,GPU使用 將輸入張量移至 (請參閱以下範例)。

  8. torch.Tensor.backwardtorch.autograd.backward 取代為 DistributedModel.backward

  9. 使用 StepOutput 方法 (如 reduce_mean) 對微批次的輸出上執行後處理。

  10. 如果有評估步驟, 也會將轉送邏輯放在 smp.step裝飾的 函數中,並使用 處理輸出StepOutputAPI

  11. DataLoader 中設定 drop_last=True。或者,如果批次大小不能被微批次數量整除,則手動略過訓練循環中的批次。

若要進一步了解 SageMaker模型平行處理程式庫 API,請參閱 API 文件

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

使用 手動分割 PyTorch

使用 smp.partition 內容管理員,將模組放置在特定裝置中。未放置在任何 smp.partition 內容中的任何模組都會放置在 default_partition。如果 auto_partition 設定為 False,則需要提供 default_partition。在特定 smp.partition 內容中建立的模組,將被放在對應的分割上。

若要進一步了解 SageMaker模型平行處理程式庫 API,請參閱 API 文件

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

考量事項

使用 SageMaker模型平行處理程式庫設定 PyTorch 訓練指令碼時,您應該注意下列事項:

  • 如果您使用依賴全域梯度規範的最佳化技術,例如來自整個模型的梯度規範,例如某些LAMB最佳化工具變體或全域梯度剪輯,則需要收集跨模型分割區的所有規範,以確保正確性。您可以透過程式庫的通訊基本資料類型來執行此操作。

  • 模型內 nn.Modules 的轉送方法中,所有 torch.Tensor 引數都必須用於模組輸出的運算中。換句話說,在有 torch.Tensor 引數的情況下,程式庫不支援模組輸出到未使用的模組。

  • smp.DistributedModel.backward() 呼叫的引數必須取決於所有模型輸出。換句話說,在轉送至 smp.DistributedModel.backward 呼叫的張量運算中未使用的 smp.DistributedModel.forward 呼叫輸出不應存在。

  • 如果程式碼中有torch.cuda.synchronize() 呼叫,您可能需要在同步呼叫之前立即呼叫 torch.cuda.set_device(smp.local_rank())。否則,可能會在裝置 0 中建立不必要的CUDA內容,這將不必要的耗用記憶體。

  • 由於程式庫將 nn.Modules 放置在不同的裝置上,因此模型中的模組不得依賴 smp.step 內部修改的任何全域狀態。在整個訓練期間保持固定的任何狀態,或在 smp.step 外以所有程序都可見的方式修改任何狀態,都是被允許的。

  • 使用程式庫時,您不需要將模型移至 GPU(例如,使用 model.to(device))。如果您嘗試在分割模型GPU之前 (第一次smp.step呼叫之前) 將模型移至 ,則會忽略移動呼叫。程式庫會自動將指派給排名的模型部分移至其 GPU。使用程式庫進行訓練後,請勿將模型移至 CPU 並使用它,因為其對於未指派給程序所保留分割區的模組,沒有正確的參數。如果您想要重新訓練模型,或在使用模型平行處理程式庫進行訓練後,將其用於在沒有程式庫的情況下進行推論,建議方法是使用檢查點儲存完整模型,API並將其載入到一般 PyTorch 模組。

  • 如果您有一個模組清單,以便將一個輸出轉送到另一個模組中,則將該清單取代為 nn.Sequential 可以顯著改善效能。

  • 權重更新 (optimizer.step()) 必須在 smp.step 之外進行,因為屆時整個向後傳遞程序已完成,且漸層已準備就緒。使用混合模型搭配模型和資料平行處理時,目前也保證完成 AllReduce 漸層。

  • 將程式庫與資料平行處理結合使用時,請確定所有資料平行排名上的批次數量相同,如此 就 AllReduce 不會等待未參與步驟的排名。

  • 如果您使用 ml.p4d 執行個體類型 (例如 ml.p4d.24xlarge) 啟動訓練任務,則必須設定資料載入器變數 num_workers=0。舉例來說,您可以將 DataLoader 定義如下:

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • smp.step 的輸入必須是由 DataLoader 產生的模型輸入。這是因為 smp.step 會沿批次維度內部分割輸入張量,然後將其管道化。這表示將 DataLoader 本身傳遞給 smp.step 函式以生成內部的模型輸入是不可行的。

    舉例來說,假如將 DataLoader 定義如下:

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    您應該存取由 train_loader 產生的模型輸入,並將其傳遞給具備 smp.step 裝飾的函式。請勿直接將 train_loader 傳遞給 smp.step 函式。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • 至 的輸入張量smp.step必須使用 .to() 移至目前的裝置API,這必須在torch.cuda.set_device(local_rank())呼叫後發生。

    舉例來說,可以將 train 函式定義如下。此函數會在使用這些輸入張量呼叫 .to()API之前,使用 targetdata和 新增至目前的裝置train_step

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    smp.set 裝飾函式的輸入張量已移動至上述 train 函式中的目前裝置。此模型需要移動到目前裝置。程式庫會自動將指派給排名的模型部分移至其 GPU。

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

不支援架構功能

SageMaker模型平行處理程式庫不支援下列 PyTorch 功能:

  • 如果您將資料平行處理與原生 搭配使用PyTorch DDP,則程式庫不支援torch.nn.parallel.DistributedDataParallel包裝函式模組。程式庫會在內部管理與 的整合 PyTorch DDP,包括參數廣播和漸層 AllReduce。使用程式庫時,模組緩衝區僅在訓練開始時廣播一次。如果您的模型有模組緩衝區,需要在每個步驟跨資料平行群組進行同步,您可以使用可透過 torch.distributed 取得的程序群組API,透過 進行同步smp.get_dp_process_group()

  • 如為混合精確度訓練,則不支援 apex.amp 模組。在自動混合精確度使用程式庫的情境中,建議使用 torch.cuda.amp 來操作,但在 Torch 中進行的實作則應使用 smp.amp.GradScaler

  • smp.DistributedModel 不支援 torch.jit.ScriptModulesScriptFunctions

  • apexapexFusedLayerNormFusedAdamFusedLAMBFusedNovoGrad 不支援。您可以smp.nnAPIs改為透過 smp.optimizers和 使用這些程式庫實作。