本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在本區段,您會學習:
-
如何設定 SageMaker PyTorch 預估器與 SageMaker 模型平行處理選項,以便使用張量平行處理。
-
如何使用已延伸
smdistributed.modelparallel
模組來調整訓練指令碼,以達到張量平行處理。
若要進一步了解 smdistributed.modelparallel
模組,請參閱 SageMaker Python SDK 文件
僅採用張量平行處理
以下範例說明分散式訓練選項可單獨啟用張量平行處理,而無需管道平行處理。設定 mpi_options
與 smp_options
字典,指定分散式訓練選項至 SageMaker PyTorch
估算器。
注意
可透過適用 PyTorch 的 Deep Learning Containers 使用延伸節省記憶體功能,其會實作 SageMaker 模型平行處理程式庫 v1.6.0 或更高版本。
設定 SageMaker PyTorch 估算器
mpi_options = {
"enabled" : True,
"processes_per_host" : 8, # 8 processes
"custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none "
}
smp_options = {
"enabled":True,
"parameters": {
"pipeline_parallel_degree": 1, # alias for "partitions"
"placement_strategy": "cluster",
"tensor_parallel_degree": 4, # tp over 4 devices
"ddp": True
}
}
smp_estimator = PyTorch(
entry_point='your_training_script.py
', # Specify
role=role,
instance_type='ml.p3.16xlarge
',
sagemaker_session=sagemaker_session,
framework_version='1.13.1',
py_version='py36',
instance_count=1,
distribution={
"smdistributed": {"modelparallel": smp_options},
"mpi": mpi_options
},
base_job_name="SMD-MP-demo
",
)
smp_estimator.fit('s3://my_bucket/my_training_data/
')
提示
若要尋找 distribution
的完整參數清單,請參閱 SageMaker Python SDK 文件的模型平行處理的設定參數
調整 PyTorch 訓練指令碼
下列範例訓練指令碼顯示如何調整 SageMaker 模型平行處理程式庫,以便適應訓練指令碼。在此範例,假設指令碼命名為 your_training_script.py
。
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets
import smdistributed.modelparallel.torch as smp
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return F.log_softmax(x, 1)
def train(model, device, train_loader, optimizer):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
# smdistributed: Move input tensors to the GPU ID used by
# the current process, based on the set_device call.
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target, reduction="mean")
loss.backward()
optimizer.step()
# smdistributed: Initialize the backend
smp.init()
# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")
# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
if smp.local_rank() == 0:
dataset = datasets.MNIST("../data", train=True, download=False)
smp.barrier()
# smdistributed: Shard the dataset based on data parallel ranks
if smp.dp_size() > 1:
partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
dataset = SplitDataset(dataset, partitions=partitions_dict)
dataset.select(f"{smp.dp_rank()}")
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64)
# smdistributed: Enable tensor parallelism for all supported modules in the model
# i.e., nn.Linear in this case. Alternatively, we can use
# smp.set_tensor_parallelism(model.fc1, True)
# to enable it only for model.fc1
with smp.tensor_parallelism():
model = Net()
# smdistributed: Use the DistributedModel wrapper to distribute the
# modules for which tensor parallelism is enabled
model = smp.DistributedModel(model)
optimizer = optim.AdaDelta(model.parameters(), lr=4.0)
optimizer = smp.DistributedOptimizer(optimizer)
train(model, device, train_loader, optimizer)
張量平行處理結合管道平行處理
以下是分散式訓練選項的範例,該選項可結合張量平行處理與管道平行處理。在設定 SageMaker PyTorch
估算器時,設定 mpi_options
與 smp_options
參數,以便指定具張量平行處理的模型平行選項。
注意
可透過適用 PyTorch 的 Deep Learning Containers 使用延伸節省記憶體功能,其會實作 SageMaker 模型平行處理程式庫 v1.6.0 或更高版本。
設定 SageMaker PyTorch 估算器
mpi_options = {
"enabled" : True,
"processes_per_host" : 8, # 8 processes
"custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none "
}
smp_options = {
"enabled":True,
"parameters": {
"microbatches": 4,
"pipeline_parallel_degree": 2
, # alias for "partitions"
"placement_strategy": "cluster",
"tensor_parallel_degree": 2
, # tp over 2 devices
"ddp": True
}
}
smp_estimator = PyTorch(
entry_point='your_training_script.py
', # Specify
role=role,
instance_type='ml.p3.16xlarge
',
sagemaker_session=sagemaker_session,
framework_version='1.13.1',
py_version='py36',
instance_count=1,
distribution={
"smdistributed": {"modelparallel": smp_options},
"mpi": mpi_options
},
base_job_name="SMD-MP-demo
",
)
smp_estimator.fit('s3://my_bucket/my_training_data/
')
調整 PyTorch 訓練指令碼
下列範例訓練指令碼顯示如何調整 SageMaker 模型平行處理程式庫,以便適應訓練指令碼。請注意,訓練指令碼現在包含 smp.step
裝飾項目:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets
import smdistributed.modelparallel.torch as smp
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return F.log_softmax(x, 1)
# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
output = model(data)
loss = F.nll_loss(output, target, reduction="mean")
model.backward(loss)
return output, loss
def train(model, device, train_loader, optimizer):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
# smdistributed: Move input tensors to the GPU ID used by
# the current process, based on the set_device call.
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
# Return value, loss_mb is a StepOutput object
_, loss_mb = train_step(model, data, target)
# smdistributed: Average the loss across microbatches.
loss = loss_mb.reduce_mean()
optimizer.step()
# smdistributed: Initialize the backend
smp.init()
# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")
# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
if smp.local_rank() == 0:
dataset = datasets.MNIST("../data", train=True, download=False)
smp.barrier()
# smdistributed: Shard the dataset based on data parallel ranks
if smp.dp_size() > 1:
partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
dataset = SplitDataset(dataset, partitions=partitions_dict)
dataset.select(f"{smp.dp_rank()}")
# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)
model = Net()
# smdistributed: enable tensor parallelism only for model.fc1
smp.set_tensor_parallelism(model.fc1, True)
# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = optim.AdaDelta(model.parameters(), lr=4.0)
optimizer = smp.DistributedOptimizer(optimizer)
train(model, device, train_loader, optimizer)