本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Tensor 平行處理執行 SageMaker 分散式模型平行訓練任務
在本區段,您會學習:
-
如何設定 SageMaker PyTorch 估算器和 SageMaker 模型平行處理選項以使用張量平行處理。
-
如何使用已延伸
smdistributed.modelparallel
模組來調整訓練指令碼,以達到張量平行處理。
若要進一步了解這些smdistributed.modelparallel
模組,請參閱 SageMaker Python SDK 文件中的SageMaker 平行模型APIs
僅採用張量平行處理
以下範例說明分散式訓練選項可單獨啟用張量平行處理,而無需管道平行處理。設定 mpi_options
和 smp_options
字典,將分散式訓練選項指定給 SageMaker PyTorch
估算器。
注意
適用於 的 Deep Learning Containers 提供擴充的記憶體節省功能 PyTorch,可實作 SageMaker 模型平行處理程式庫 1.6.0 版或更新版本。
設定 SageMaker PyTorch 估算器
mpi_options = { "enabled" : True, "processes_per_host" : 8, # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " } smp_options = { "enabled":True, "parameters": { "pipeline_parallel_degree": 1, # alias for "partitions" "placement_strategy": "cluster", "tensor_parallel_degree": 4, # tp over 4 devices "ddp": True } } smp_estimator = PyTorch( entry_point='
your_training_script.py
', # Specify role=role, instance_type='ml.p3.16xlarge
', sagemaker_session=sagemaker_session, framework_version='1.13.1', py_version='py36', instance_count=1, distribution={ "smdistributed": {"modelparallel": smp_options}, "mpi": mpi_options }, base_job_name="SMD-MP-demo
", ) smp_estimator.fit('s3://my_bucket/my_training_data/
')
提示
若要尋找 的完整參數清單distribution
,請參閱 Python SDK 文件中的 SageMaker模型平行組態參數
調整您的 PyTorch 訓練指令碼
下列訓練指令碼範例說明如何將 SageMaker 模型平行處理程式庫調整為訓練指令碼。在此範例,假設指令碼命名為 your_training_script.py
。
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.fc2(x) return F.log_softmax(x, 1) def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by # the current process, based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target, reduction="mean") loss.backward() optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0: dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") train_loader = torch.utils.data.DataLoader(dataset, batch_size=64) # smdistributed: Enable tensor parallelism for all supported modules in the model # i.e., nn.Linear in this case. Alternatively, we can use # smp.set_tensor_parallelism(model.fc1, True) # to enable it only for model.fc1 with smp.tensor_parallelism(): model = Net() # smdistributed: Use the DistributedModel wrapper to distribute the # modules for which tensor parallelism is enabled model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)
張量平行處理結合管道平行處理
以下是分散式訓練選項的範例,可啟用張量平行處理與管道平行處理結合。設定 mpi_options
和 smp_options
參數,以在設定 SageMakerPyTorch
估算器時指定具有張量平行處理的模型平行選項。
注意
適用於 的 Deep Learning Containers 提供擴充的記憶體節省功能 PyTorch,可實作 SageMaker 模型平行處理程式庫 1.6.0 版或更新版本。
設定 SageMaker PyTorch 估算器
mpi_options = { "enabled" : True, "processes_per_host" : 8, # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " } smp_options = { "enabled":True, "parameters": { "microbatches": 4,
"pipeline_parallel_degree": 2
, # alias for "partitions" "placement_strategy": "cluster","tensor_parallel_degree": 2
, # tp over 2 devices "ddp": True } } smp_estimator = PyTorch( entry_point='your_training_script.py
', # Specify role=role, instance_type='ml.p3.16xlarge
', sagemaker_session=sagemaker_session, framework_version='1.13.1', py_version='py36', instance_count=1, distribution={ "smdistributed": {"modelparallel": smp_options}, "mpi": mpi_options }, base_job_name="SMD-MP-demo
", ) smp_estimator.fit('s3://my_bucket/my_training_data/
')
調整您的 PyTorch 訓練指令碼
下列訓練指令碼範例說明如何將 SageMaker 模型平行處理程式庫調整為訓練指令碼。請注意,訓練指令碼現在包含 smp.step
裝飾項目:
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.fc2(x) return F.log_softmax(x, 1) # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by # the current process, based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0: dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = Net() # smdistributed: enable tensor parallelism only for model.fc1 smp.set_tensor_parallelism(model.fc1, True) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)