本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
修改 PyTorch 訓練指令集
在本節中,您將學習如何修改 PyTorch 訓練指令碼,以設定用於自動磁碟分割和手動磁碟分割的 SageMaker 模型平行程式庫。
注意
若要尋找程式庫支援哪些 PyTorch 版本,請參閱支援的架構與 AWS 區域。
提示
如需示範如何搭配 SageMaker 模型平行程式庫使用 PyTorch 訓練指令集的 end-to-end 筆記本範例,請參閱Amazon SageMaker 模型並行程式庫 v1 範例。
請注意,自動磁碟分割預設為啟用。除非特別指定,否則下列指令碼都採用自動分割。
自動拆分 PyTorch
若要使用 SageMaker的模型平行程式庫執行 PyTorch 訓練指令碼,必須進行下列訓練指令碼變更:
-
使用
smdistributed.modelparallel.torch.init()
匯入和初始化程式庫。 -
以
smdistributed.modelparallel.torch.DistributedModel
包裝模型。請注意,從基礎 nn.Module
物件的forward
方法傳回的任何張量,都將跨模型平行裝置廣播,造成通訊開銷增加,因此不需傳回在呼叫方法之外的非必要張量 (例如中繼啟動)。注意
如為 FP16 訓練,您需要使用smdistributed.modelparallel.torch.model_creation()
內容管理器來包裝模型。如需詳細資訊,請參閱 FP16使用模型平行度進行訓練。 -
以
smdistributed.modelparallel.torch.DistributedOptimizer
包裝最佳化工具。 注意
FP16 訓練時,您需要設定靜態或動態損失擴展功能。如需詳細資訊,請參閱 FP16使用模型平行度進行訓練。
-
使用傳回的
DistributedModel
物件,而非使用者模型。 -
將轉送和向後邏輯放在 Step Function 中,並使用
smdistributed.modelparallel.torch.step
進行裝飾。 -
透過
torch.cuda.set_device(smp.local_rank())
將每個程序限制在所屬裝置中。 -
在
smp.step
呼叫之前,使用.to()
API 將輸入張量移動至 GPU (請參閱下列範例)。 -
將
torch.Tensor.backward
和torch.autograd.backward
取代為DistributedModel.backward
。 -
使用
StepOutput
方法 (如 reduce_mean
) 對微批次的輸出上執行後處理。 -
如果有評估步驟,採取類似將轉送邏輯放在
smp.step
-裝飾函式內的作法,並使用StepOutput
API對輸出執行後處理。 -
在
DataLoader
中設定drop_last=True
。或者,如果批次大小不能被微批次數量整除,則手動略過訓練循環中的批次。
要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)
手動分割 PyTorch
使用 smp.partition
smp.partition
內容中的任何模組都會放置在 default_partition
。如果 auto_partition
設定為 False
,則需要提供 default_partition
。在特定 smp.partition
內容中建立的模組,將被放在對應的分割上。
要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)
考量事項
使用的模型平行程式庫設定 PyTorch 訓練指令碼時,您應該注意下列 SageMaker事項:
-
如果您使用根據全域漸層規範的最佳化技術,像是整個模型的漸層規範 (例如 LAMB 最佳化工具的某些變體或全域漸層剪輯),您需要收集整個模型分割區中的所有規範以便進行勘誤。您可以透過程式庫的通訊基本資料類型來執行此操作。
-
模型內
nn.Modules
的轉送方法中,所有torch.Tensor
引數都必須用於模組輸出的運算中。換句話說,在有torch.Tensor
引數的情況下,程式庫不支援模組輸出到未使用的模組。 -
smp.DistributedModel.backward()
呼叫的引數必須取決於所有模型輸出。換句話說,在轉送至smp.DistributedModel.backward
呼叫的張量運算中未使用的smp.DistributedModel.forward
呼叫輸出不應存在。 -
如果程式碼中有
torch.cuda.synchronize()
呼叫,您可能需要在同步呼叫之前立即呼叫torch.cuda.set_device(smp.local_rank())
。否則,可能會在裝置 0 中建立不必要的 CUDA 內容,這將額外消耗記憶體。 -
由於程式庫將
nn.Modules
放置在不同的裝置上,因此模型中的模組不得依賴smp.step
內部修改的任何全域狀態。在整個訓練期間保持固定的任何狀態,或在smp.step
外以所有程序都可見的方式修改任何狀態,都是被允許的。 -
使用程式庫時,您不需要將模型移動至 GPU (例如使用
model.to(device)
)。如果您在分割模型前 (在第一次smp.step
呼叫前)試圖將模型移至 GPU,則移動呼叫將被忽略。程式庫會自動將指派給某個階級的模型部分移動至其 GPU。一旦開始使用程式庫進行訓練,請不要將模型移動到 CPU 並使用,因為未指派給該程序持有分割的模組,將不會有正確的參數。如果您想在使用模型並行程式庫訓練模型之後,在沒有程式庫的情況下重新訓練模型,建議的方法是使用我們的檢查點 API 儲存完整模型,並將其載入回一般模組。 PyTorch -
如果您有一個模組清單,以便將一個輸出轉送到另一個模組中,則將該清單取代為
nn.Sequential
可以顯著改善效能。 -
權重更新 (
optimizer.step()
) 必須在smp.step
之外進行,因為屆時整個向後傳遞程序已完成,且漸層已準備就緒。當使用具有模型和數據並行性 AllReduce 的混合模型時,此時,漸變也保證完成。 -
將程式庫與資料 parallel 處理原則結合使用時,請確定所有資料平行排名上的批次數目都相同,這樣就 AllReduce 不會停止等待未參與步驟的等級。
-
如果您使用 ml.p4d 執行個體類型 (例如 ml.p4d.24xlarge) 啟動訓練任務,則必須設定資料載入器變數
num_workers=0
。舉例來說,您可以將DataLoader
定義如下:dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
-
smp.step
的輸入必須是由DataLoader
產生的模型輸入。這是因為smp.step
會沿批次維度內部分割輸入張量,然後將其管道化。這表示將DataLoader
本身傳遞給smp.step
函式以生成內部的模型輸入是不可行的。舉例來說,假如將
DataLoader
定義如下:train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)
您應該存取由
train_loader
產生的模型輸入,並將其傳遞給具備smp.step
裝飾的函式。請勿直接將train_loader
傳遞給smp.step
函式。def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
-
smp.step
的輸入張量,必須透過.to()
API 移動至目前裝置,這必須在torch.cuda.set_device(local_rank())
呼叫後執行。舉例來說,可以將
train
函式定義如下。該函式在使用輸入張量呼叫train_step
之前,使用.to()
API 將data
和target
增添至目前裝置。def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()
此
smp.set
裝飾函式的輸入張量已移動至上述train
函式中的目前裝置。此模型不需要移動到目前裝置。程式庫會自動將指派給某個階級的模型部分移動至其 GPU。@smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss
不支援架構功能
模型平行程式庫不支 SageMaker援下列 PyTorch 功能:
-
如果您將資料平行處理原則與原生 PyTorch DDP
搭配使用,則程式庫不支援 torch.nn.parallel.DistributedDataParallel
包裝函式模組。該庫內部管理與 PyTorch DDP 的集成,包括參數廣播和梯度 AllReduce。使用程式庫時,模組緩衝區僅在訓練開始時廣播一次。假如模型具備模組緩衝區,並需要在每個步驟中跨資料平行群組同步,您可以透過 torch.distributed
API 執行此操作,使用透過smp.get_dp_process_group()
取得的程序群組。 -
如為混合精確度訓練,則不支援
apex.amp
模組。在自動混合精確度使用程式庫的情境中,建議使用torch.cuda.amp
來操作,但在 Torch 中進行的實作則應使用smp.amp.GradScaler
。 -
smp.DistributedModel
不支援torch.jit.ScriptModules
或ScriptFunctions
。 -
apex
:apex
的FusedLayerNorm
、FusedAdam
、FusedLAMB
和FusedNovoGrad
不支援。您可以透過smp.optimizers
和smp.nn
API 來使用這些程式庫的實作。