修改 PyTorch 訓練指令碼 - Amazon SageMaker AI

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

修改 PyTorch 訓練指令碼

在本節中,您將了解如何修改 PyTorch 訓練指令碼,以針對自動磁碟分割和手動磁碟分割設定 SageMaker 模型平行處理程式庫。

注意

若需有關程式庫支援的 PyTorch 版本資訊,請參閱支援的架構與 AWS 區域

提示

如需完整徹底的筆記本範例,示範如何將 PyTorch 訓練指令碼與 SageMaker 模型平行處理程式庫搭配使用,請參閱Amazon SageMaker AI 模型平行處理程式庫 v1 範例

請注意,自動磁碟分割預設為啟用。除非特別指定,否則下列指令碼都採用自動分割。

使用 PyTorch 自動化分割

若要使用 SageMaker 模型平行處理程式庫執行 PyTorch 訓練指令碼,必須變更以下訓練指令碼:

  1. 使用 smdistributed.modelparallel.torch.init() 匯入和初始化程式庫。

  2. smdistributed.modelparallel.torch.DistributedModel 包裝模型。請注意,從基礎 nn.Module 物件的 forward 方法傳回的任何張量,都將跨模型平行裝置廣播,造成通訊開銷增加,因此不需傳回在呼叫方法之外的非必要張量 (例如中繼啟動)。

    注意

    如為 FP16 訓練,您需要使用smdistributed.modelparallel.torch.model_creation()內容管理器來包裝模型。如需詳細資訊,請參閱使用模型平行處理進行 FP16 訓練

  3. smdistributed.modelparallel.torch.DistributedOptimizer 包裝最佳化工具。

    注意

    FP16 訓練時,您需要設定靜態或動態損失擴展功能。如需詳細資訊,請參閱使用模型平行處理進行 FP16 訓練

  4. 使用傳回的 DistributedModel 物件,而非使用者模型。

  5. 將轉送和向後邏輯放在 Step Function 中,並使用 smdistributed.modelparallel.torch.step 進行裝飾。

  6. 透過 torch.cuda.set_device(smp.local_rank()) 將每個程序限制在所屬裝置中。

  7. smp.step 呼叫之前,使用 .to() API 將輸入張量移動至 GPU (請參閱下列範例)。

  8. torch.Tensor.backwardtorch.autograd.backward 取代為 DistributedModel.backward

  9. 使用 StepOutput 方法 (如 reduce_mean) 對微批次的輸出上執行後處理。

  10. 如果有評估步驟,採取類似將轉送邏輯放在 smp.step-裝飾函式內的作法,並使用 StepOutput API 對輸出執行後處理。

  11. DataLoader 中設定 drop_last=True。或者,如果批次大小不能被微批次數量整除,則手動略過訓練循環中的批次。

若要進一步了解 SageMaker 模型平行處理程式庫 API,請參閱 API 文件

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

使用 PyTorch 手動分割

使用 smp.partition 內容管理員,將模組放置在特定裝置中。未放置在任何 smp.partition 內容中的任何模組都會放置在 default_partition。如果 auto_partition 設定為 False,則需要提供 default_partition。在特定 smp.partition 內容中建立的模組,將被放在對應的分割上。

若要進一步了解 SageMaker 模型平行處理程式庫 API,請參閱 API 文件

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

考量事項

使用 SageMaker 模型平行處理程式庫設定 PyTorch 訓練指令碼時,有下列幾點需要注意:

  • 如果您使用根據全域漸層規範的最佳化技術,像是整個模型的漸層規範 (例如 LAMB 最佳化工具的某些變體或全域漸層剪輯),您需要收集整個模型分割區中的所有規範以便進行勘誤。您可以透過程式庫的通訊基本資料類型來執行此操作。

  • 模型內 nn.Modules 的轉送方法中,所有 torch.Tensor 引數都必須用於模組輸出的運算中。換句話說,在有 torch.Tensor 引數的情況下,程式庫不支援模組輸出到未使用的模組。

  • smp.DistributedModel.backward() 呼叫的引數必須取決於所有模型輸出。換句話說,在轉送至 smp.DistributedModel.backward 呼叫的張量運算中未使用的 smp.DistributedModel.forward 呼叫輸出不應存在。

  • 如果程式碼中有torch.cuda.synchronize() 呼叫,您可能需要在同步呼叫之前立即呼叫 torch.cuda.set_device(smp.local_rank())。否則,可能會在裝置 0 中建立不必要的 CUDA 內容,這將額外消耗記憶體。

  • 由於程式庫將 nn.Modules 放置在不同的裝置上,因此模型中的模組不得依賴 smp.step 內部修改的任何全域狀態。在整個訓練期間保持固定的任何狀態,或在 smp.step 外以所有程序都可見的方式修改任何狀態,都是被允許的。

  • 使用程式庫時,您不需要將模型移動至 GPU (例如使用 model.to(device))。如果您在分割模型前 (在第一次 smp.step 呼叫前)試圖將模型移至 GPU,則移動呼叫將被忽略。程式庫會自動將指派給某個階級的模型部分移動至其 GPU。一旦開始使用程式庫進行訓練,請不要將模型移動到 CPU 並使用,因為未指派給該程序持有分割的模組,將不會有正確的參數。如果您想在使用模型平行處理程式庫訓練模型之後,在沒有程式庫的前提下重新訓練模型或作為推論使用,建議使用檢查點 API 儲存完整模型,並將其載入至一般的 PyTorch 模組。

  • 如果您有一個模組清單,以便將一個輸出轉送到另一個模組中,則將該清單取代為 nn.Sequential 可以顯著改善效能。

  • 權重更新 (optimizer.step()) 必須在 smp.step 之外進行,因為屆時整個向後傳遞程序已完成,且漸層已準備就緒。當使用具有模型和資料平行處理的混合模型時,漸層的 AllReduce 也必然會完成。

  • 將程式庫與資料平行處理結合使用時,請確定所有資料平行排名上的批次數量都相同,如此一來,AllReduce 便不會懸置等待步驟內未參與的排名。

  • 如果您使用 ml.p4d 執行個體類型 (例如 ml.p4d.24xlarge) 啟動訓練任務,則必須設定資料載入器變數 num_workers=0。舉例來說,您可以將 DataLoader 定義如下:

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • smp.step 的輸入必須是由 DataLoader 產生的模型輸入。這是因為 smp.step 會沿批次維度內部分割輸入張量,然後將其管道化。這表示將 DataLoader 本身傳遞給 smp.step 函式以生成內部的模型輸入是不可行的。

    舉例來說,假如將 DataLoader 定義如下:

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    您應該存取由 train_loader 產生的模型輸入,並將其傳遞給具備 smp.step 裝飾的函式。請勿直接將 train_loader 傳遞給 smp.step 函式。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • smp.step 的輸入張量,必須透過 .to() API 移動至目前裝置,這必須在 torch.cuda.set_device(local_rank()) 呼叫後執行。

    舉例來說,可以將 train 函式定義如下。該函式在使用輸入張量呼叫 train_step 之前,使用 .to() API 將 datatarget 增添至目前裝置。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    smp.set 裝飾函式的輸入張量已移動至上述 train 函式中的目前裝置。此模型需要移動到目前裝置。程式庫會自動將指派給某個階級的模型部分移動至其 GPU。

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

不支援架構功能

SageMaker 模型平行處理程式庫不支援下列 PyTorch 功能:

  • 如果您使用資料平行處理搭配原生 PyTorch DDP,則程式庫不支援 torch.nn.parallel.DistributedDataParallel 包裝函式模組。程式庫內部管理整合 PyTorch DDP ,包含參數廣播和漸層 AllReduce。使用程式庫時,模組緩衝區僅在訓練開始時廣播一次。假如模型具備模組緩衝區,並需要在每個步驟中跨資料平行群組同步,您可以透過 torch.distributed API 執行此操作,使用透過 smp.get_dp_process_group() 取得的程序群組。

  • 如為混合精確度訓練,則不支援 apex.amp 模組。在自動混合精確度使用程式庫的情境中,建議使用 torch.cuda.amp 來操作,但在 Torch 中進行的實作則應使用 smp.amp.GradScaler

  • smp.DistributedModel 不支援 torch.jit.ScriptModulesScriptFunctions

  • apexapexFusedLayerNormFusedAdamFusedLAMBFusedNovoGrad 不支援。您可以透過 smp.optimizerssmp.nn API 來使用這些程式庫的實作。