翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
PyTorch トレーニングスクリプトを変更
このセクションでは、 PyTorch トレーニングスクリプトを変更して、 SageMaker 自動パーティショニングと手動パーティショニング用のモデル並列処理ライブラリを設定する方法を学習します。
注記
PyTorch ライブラリがどのバージョンをサポートしているかを確認するには、を参照してください。サポートされるフレームワークと AWS リージョン
ヒント
end-to-end PyTorch SageMaker モデル並列処理ライブラリでトレーニングスクリプトを使用する方法を示すノートブックの例については、を参照してください。Amazon SageMaker モデル並列処理ライブラリ v1 の例
自動パーティショニングはデフォルトで有効になっています。別に指定されていない限り、次のスクリプトでは自動パーティショニングが使用されます。
による自動分割 PyTorch
SageMakerのモデル並列処理ライブラリでトレーニングスクリプトを実行するには、 PyTorch 以下のトレーニングスクリプトの変更が必要です。
-
ライブラリをインポートし、
smdistributed.modelparallel.torch.init()
で初期化します。 -
モデルを
smdistributed.modelparallel.torch.DistributedModel
でラップします。基盤となる nn.Module
オブジェクトのforward
メソッドから返されたテンソルはモデル並列デバイス間でブロードキャストされるため、通信オーバーヘッドが発生します。したがって、call メソッド範囲外の不要なテンソル (中間アクティベーションなど) は返さないように注意してください。注記
FP16 のトレーニングでは、smdistributed.modelparallel.torch.model_creation()
コンテキストマネージャーを使用してモデルをラップする必要があります。詳細については、「FP16 モデル並列処理によるトレーニング」を参照してください。 -
オプティマイザを
smdistributed.modelparallel.torch.DistributedOptimizer
でラップします。 注記
FP16 トレーニングでは、静的または動的な損失スケーリングを設定する必要があります。詳細については、「FP16 モデル並列処理によるトレーニング」を参照してください。
-
ユーザーモデルの代わりに、返された
DistributedModel
オブジェクトを使用します。 -
フォワードおよびバックワードロジックをステップ関数に入れ、
smdistributed.modelparallel.torch.step
で修飾します。 -
torch.cuda.set_device(smp.local_rank())
を使って、各プロセスをそれ固有のデバイスに制限します。 -
smp.step
呼び出しの前に.to()
APIを使って入力テンソルを GPU に移動します (次の例を参照)。 -
torch.Tensor.backward
とtorch.autograd.backward
をDistributedModel.backward
に置き換えます。 -
reduce_mean
などのStepOutput
メソッドを使って、マイクロバッチ全体の出力に対して後処理を実行します。 -
評価ステップがある場合は、同様にフォワードロジックを
smp.step
で修飾された関数内に配置し、StepOutput
APIを使って出力を後処理します。 -
DataLoader
でdrop_last=True
を設定します。または、バッチサイズがマイクロバッチ数で割り切れない場合は、トレーニングループ内でバッチを手動でスキップします。
SageMakerのモデル並列処理ライブラリ API の詳細については、API ドキュメントを参照してください。
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)
による手動分割 PyTorch
smp.partition
smp.partition
コンテキストにも配置されていないモジュールは、default_partition
に配置されます。auto_partition
が False
に設定されている場合は、default_partition
を指定する必要があります。特定の smp.partition
コンテキスト内に作成されたモジュールは、対応するパーティションに配置されます。
SageMakerのモデル並列処理ライブラリ API の詳細については、API ドキュメントを参照してください。
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)
考慮事項
PyTorch SageMakerのモデル並列処理ライブラリを使用してトレーニングスクリプトを設定する場合、次の点に注意する必要があります。
-
グローバル勾配ノルム (例えば、LAMB オプティマイザの一部のバリアントやグローバル勾配クリッピングなど、モデル全体からの勾配ノルム) に依存する最適化手法を使っている場合は、正確性確保のために、モデルパーティション全体のすべてのノルムを収集する必要があります。これを行うには、ライブラリの通信基本データタイプを使用できます。
-
モデル内の
nn.Modules
の forward メソッドに渡すすべてのtorch.Tensor
引数は、モジュール出力の計算に使う必要があります。つまり、ライブラリは、モジュール出力が依存しないモジュールに渡すtorch.Tensor
引数が存在するケースをサポートしていません。 -
smp.DistributedModel.backward()
呼び出しに渡す引数は、すべてのモデル出力に依存する必要があります。つまり、smp.DistributedModel.backward
呼び出しに供給されるテンソルの計算で使用されないsmp.DistributedModel.forward
呼び出しからの出力はあり得ません。 -
コードに
torch.cuda.synchronize()
呼び出しがある場合は、同期呼び出しの直前にtorch.cuda.set_device(smp.local_rank())
を呼び出す必要がある場合があります。そうしないと、デバイス 0 に不要な CUDA コンテキストが作成され、メモリを不必要に消費する場合があります。 -
ライブラリは
nn.Modules
を異なるデバイスに配置するため、モデル内のモジュールは、smp.step
内で変更されるグローバル状態に依存してはいけません。トレーニング中に固定されたままの状態や、すべてのプロセスに表示される方法でsmp.step
の外部で変更される状態は、すべて許可されます。 -
ライブラリを使う場合、GPU にモデルを移動する必要はありません (例えば、
model.to(device)
を使って)。モデルがパーティショニングされる前 (最初のsmp.step
呼び出しの前) にモデルを GPU に移動しようとすると、move 呼び出しは無視されます。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。ライブラリによるトレーニングが開始したら、モデルを CPU に移動して使用しないでください。モデルにはモジュールの正しいパラメータがなく、プロセスによって保持されているパーティションに割り当てられないためです。モデル並列処理ライブラリを使用してトレーニングした後に、モデルを再トレーニングしたり、ライブラリなしで推論に使用したりする場合は、チェックポインティング API を使用してモデル全体を保存し、通常のモジュールに読み込むことをお勧めします。 PyTorch -
あるモジュールの出力が別のモジュールにフィードされるようなモジュールのリストがある場合、そのリストを
nn.Sequential
に置き換えると、パフォーマンスを大幅に向上させることができます。 -
重みの更新 (
optimizer.step()
) は、smp.step
の外部で実行される必要があります。このときに、バックワードパス全体が終了し、勾配が使用可能になるためです。モデルとデータの並列処理を行うハイブリッドモデルを使用する場合、この時点で勾配も確実に終了します。 AllReduce -
ライブラリをデータ並列処理と組み合わせて使用する場合は、 AllReduce ステップに参加していないランクを待ってハングアップしないように、すべてのデータparallel ランクのバッチ数が同じであることを確認してください。
-
ml.p4d インスタンスタイプ (ml.p4d.24xlarge など) を使ってトレーニングジョブを起動する場合は、データローダー変数
num_workers=0
を設定する必要があります。例えば、DataLoader
を次のように定義できます。dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
-
smp.step
への入力は、DataLoader
によって生成されたモデル入力である必要があります。これは、smp.step
が内部で入力テンソルをバッチディメンションに沿って分割し、パイプライン化するためです。つまり、DataLoader
自体をsmp.step
関数に渡して、内部でモデル入力を生成しようとしてもうまく行きません。例えば、
DataLoader
を次のように定義する場合:train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)
train_loader
によって生成されたモデル入力にアクセスし、それらをsmp.step
で修飾された関数に渡してください。train_loader
をsmp.step
関数に直接渡さないでください。def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
-
smp.step
への入力テンソルは、.to()
API を使って現在のデバイスに移動する必要があります。これは、torch.cuda.set_device(local_rank())
呼び出しの後に実行する必要があります。例えば、
train
関数を次のように定義します。この関数は、.to()
API を使って現在のデバイスにdata
とtarget
を追加してから、これらの入力テンソルを使ってtrain_step
を呼び出します。def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()
この
smp.set
で修飾された関数への入力テンソルは、上記のtrain
関数で現在のデバイスに移動されました。モデルを現在のデバイスに移動する必要はありません。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。@smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss
サポートされていないフレームワーク機能
PyTorch SageMaker以下の機能はのモデル並列処理ライブラリではサポートされていません。
-
ネイティブ PyTorch DDP
でデータ並列処理を使用する場合、 torch.nn.parallel.DistributedDataParallel
ラッパーモジュールはライブラリでサポートされません。ライブラリは、パラメータブロードキャストやグラデーションなどの PyTorch DDP との統合を内部的に管理します。 AllReduceライブラリを使う場合、モジュールバッファはトレーニングの開始時に 1 回だけブロードキャストされます。モデルにモジュールバッファがあり、各ステップでデータ並列グループ間で同期させる必要がある場合は、 smp.get_dp_process_group()
で取得できるプロセスグループを使って、torch.distributed
API を使ってそれを行えます。 -
混合精度トレーニングでは、
apex.amp
モジュールはサポートされていません。自動混合精度でライブラリを使う推奨方法は、torch.cuda.amp
を使用することです。ただし、Torch の実装の代わりにsmp.amp.GradScaler
を使用することを除きます。 -
torch.jit.ScriptModules
またはScriptFunctions
は、smp.DistributedModel
ではサポートされていません。 -
apex
:apex
からのFusedLayerNorm
、FusedAdam
、FusedLAMB
、FusedNovoGrad
はサポートされていません。代わりに、smp.optimizers
およびsmp.nn
API を介してこれらのライブラリ実装を使用できます。