修改 PyTorch 訓練指令集 - Amazon SageMaker

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

修改 PyTorch 訓練指令集

在本節中,您將學習如何修改 PyTorch 訓練指令碼,以設定用於自動磁碟分割和手動磁碟分割的 SageMaker 模型平行程式庫。

注意

若要尋找程式庫支援哪些 PyTorch 版本,請參閱支援的架構與 AWS 區域

提示

如需示範如何搭配 SageMaker 模型平行程式庫使用 PyTorch 訓練指令集的 end-to-end 筆記本範例,請參閱Amazon SageMaker 模型並行程式庫 v1 範例

請注意,自動磁碟分割預設為啟用。除非特別指定,否則下列指令碼都採用自動分割。

自動拆分 PyTorch

若要使用 SageMaker的模型平行程式庫執行 PyTorch 訓練指令碼,必須進行下列訓練指令碼變更:

  1. 使用 smdistributed.modelparallel.torch.init() 匯入和初始化程式庫。

  2. smdistributed.modelparallel.torch.DistributedModel 包裝模型。請注意,從基礎 nn.Module 物件的 forward 方法傳回的任何張量,都將跨模型平行裝置廣播,造成通訊開銷增加,因此不需傳回在呼叫方法之外的非必要張量 (例如中繼啟動)。

    注意

    如為 FP16 訓練,您需要使用smdistributed.modelparallel.torch.model_creation()內容管理器來包裝模型。如需詳細資訊,請參閱 FP16使用模型平行度進行訓練

  3. smdistributed.modelparallel.torch.DistributedOptimizer 包裝最佳化工具。

    注意

    FP16 訓練時,您需要設定靜態或動態損失擴展功能。如需詳細資訊,請參閱 FP16使用模型平行度進行訓練

  4. 使用傳回的 DistributedModel 物件,而非使用者模型。

  5. 將轉送和向後邏輯放在 Step Function 中,並使用 smdistributed.modelparallel.torch.step 進行裝飾。

  6. 透過 torch.cuda.set_device(smp.local_rank()) 將每個程序限制在所屬裝置中。

  7. smp.step 呼叫之前,使用 .to() API 將輸入張量移動至 GPU (請參閱下列範例)。

  8. torch.Tensor.backwardtorch.autograd.backward 取代為 DistributedModel.backward

  9. 使用 StepOutput 方法 (如 reduce_mean) 對微批次的輸出上執行後處理。

  10. 如果有評估步驟,採取類似將轉送邏輯放在 smp.step-裝飾函式內的作法,並使用 StepOutput API 對輸出執行後處理。

  11. DataLoader 中設定 drop_last=True。或者,如果批次大小不能被微批次數量整除,則手動略過訓練循環中的批次。

要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

手動分割 PyTorch

使用 smp.partition 內容管理員,將模組放置在特定裝置中。未放置在任何 smp.partition 內容中的任何模組都會放置在 default_partition。如果 auto_partition 設定為 False,則需要提供 default_partition。在特定 smp.partition 內容中建立的模組,將被放在對應的分割上。

要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

考量事項

使用的模型平行程式庫設定 PyTorch 訓練指令碼時,您應該注意下列 SageMaker事項:

  • 如果您使用根據全域漸層規範的最佳化技術,像是整個模型的漸層規範 (例如 LAMB 最佳化工具的某些變體或全域漸層剪輯),您需要收集整個模型分割區中的所有規範以便進行勘誤。您可以透過程式庫的通訊基本資料類型來執行此操作。

  • 模型內 nn.Modules 的轉送方法中,所有 torch.Tensor 引數都必須用於模組輸出的運算中。換句話說,在有 torch.Tensor 引數的情況下,程式庫不支援模組輸出到未使用的模組。

  • smp.DistributedModel.backward() 呼叫的引數必須取決於所有模型輸出。換句話說,在轉送至 smp.DistributedModel.backward 呼叫的張量運算中未使用的 smp.DistributedModel.forward 呼叫輸出不應存在。

  • 如果程式碼中有torch.cuda.synchronize() 呼叫,您可能需要在同步呼叫之前立即呼叫 torch.cuda.set_device(smp.local_rank())。否則,可能會在裝置 0 中建立不必要的 CUDA 內容,這將額外消耗記憶體。

  • 由於程式庫將 nn.Modules 放置在不同的裝置上,因此模型中的模組不得依賴 smp.step 內部修改的任何全域狀態。在整個訓練期間保持固定的任何狀態,或在 smp.step 外以所有程序都可見的方式修改任何狀態,都是被允許的。

  • 使用程式庫時,您不需要將模型移動至 GPU (例如使用 model.to(device))。如果您在分割模型前 (在第一次 smp.step 呼叫前)試圖將模型移至 GPU,則移動呼叫將被忽略。程式庫會自動將指派給某個階級的模型部分移動至其 GPU。一旦開始使用程式庫進行訓練,請不要將模型移動到 CPU 並使用,因為未指派給該程序持有分割的模組,將不會有正確的參數。如果您想在使用模型並行程式庫訓練模型之後,在沒有程式庫的情況下重新訓練模型,建議的方法是使用我們的檢查點 API 儲存完整模型,並將其載入回一般模組。 PyTorch

  • 如果您有一個模組清單,以便將一個輸出轉送到另一個模組中,則將該清單取代為 nn.Sequential 可以顯著改善效能。

  • 權重更新 (optimizer.step()) 必須在 smp.step 之外進行,因為屆時整個向後傳遞程序已完成,且漸層已準備就緒。當使用具有模型和數據並行性 AllReduce 的混合模型時,此時,漸變也保證完成。

  • 將程式庫與資料 parallel 處理原則結合使用時,請確定所有資料平行排名上的批次數目都相同,這樣就 AllReduce 不會停止等待未參與步驟的等級。

  • 如果您使用 ml.p4d 執行個體類型 (例如 ml.p4d.24xlarge) 啟動訓練任務,則必須設定資料載入器變數 num_workers=0。舉例來說,您可以將 DataLoader 定義如下:

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • smp.step 的輸入必須是由 DataLoader 產生的模型輸入。這是因為 smp.step 會沿批次維度內部分割輸入張量,然後將其管道化。這表示將 DataLoader 本身傳遞給 smp.step 函式以生成內部的模型輸入是不可行的。

    舉例來說,假如將 DataLoader 定義如下:

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    您應該存取由 train_loader 產生的模型輸入,並將其傳遞給具備 smp.step 裝飾的函式。請勿直接將 train_loader 傳遞給 smp.step 函式。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • smp.step 的輸入張量,必須透過 .to() API 移動至目前裝置,這必須在 torch.cuda.set_device(local_rank()) 呼叫後執行。

    舉例來說,可以將 train 函式定義如下。該函式在使用輸入張量呼叫 train_step 之前,使用 .to() API 將 datatarget 增添至目前裝置。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    smp.set 裝飾函式的輸入張量已移動至上述 train 函式中的目前裝置。此模型需要移動到目前裝置。程式庫會自動將指派給某個階級的模型部分移動至其 GPU。

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

不支援架構功能

模型平行程式庫不支 SageMaker援下列 PyTorch 功能:

  • 如果您將資料平行處理原則與原生 PyTorch DDP 搭配使用,則程式庫不支援torch.nn.parallel.DistributedDataParallel包裝函式模組。該庫內部管理與 PyTorch DDP 的集成,包括參數廣播和梯度 AllReduce。使用程式庫時,模組緩衝區僅在訓練開始時廣播一次。假如模型具備模組緩衝區,並需要在每個步驟中跨資料平行群組同步,您可以透過 torch.distributed API 執行此操作,使用透過 smp.get_dp_process_group() 取得的程序群組。

  • 如為混合精確度訓練,則不支援 apex.amp 模組。在自動混合精確度使用程式庫的情境中,建議使用 torch.cuda.amp 來操作,但在 Torch 中進行的實作則應使用 smp.amp.GradScaler

  • smp.DistributedModel 不支援 torch.jit.ScriptModulesScriptFunctions

  • apexapexFusedLayerNormFusedAdamFusedLAMBFusedNovoGrad 不支援。您可以透過 smp.optimizerssmp.nn API 來使用這些程式庫的實作。