PyTorch トレーニングスクリプトを変更 - Amazon SageMaker

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

PyTorch トレーニングスクリプトを変更

このセクションでは、 PyTorch トレーニングスクリプトを変更して、 SageMaker 自動パーティショニングと手動パーティショニング用のモデル並列処理ライブラリを設定する方法を学習します。

注記

PyTorch ライブラリがどのバージョンをサポートしているかを確認するには、を参照してください。サポートされるフレームワークと AWS リージョン

ヒント

end-to-end PyTorch SageMaker モデル並列処理ライブラリでトレーニングスクリプトを使用する方法を示すノートブックの例については、を参照してください。Amazon SageMaker モデル並列処理ライブラリ v1 の例

自動パーティショニングはデフォルトで有効になっています。別に指定されていない限り、次のスクリプトでは自動パーティショニングが使用されます。

による自動分割 PyTorch

SageMakerのモデル並列処理ライブラリでトレーニングスクリプトを実行するには、 PyTorch 以下のトレーニングスクリプトの変更が必要です。

  1. ライブラリをインポートし、smdistributed.modelparallel.torch.init() で初期化します。

  2. モデルを smdistributed.modelparallel.torch.DistributedModel でラップします。基盤となる nn.Module オブジェクトの forward メソッドから返されたテンソルはモデル並列デバイス間でブロードキャストされるため、通信オーバーヘッドが発生します。したがって、call メソッド範囲外の不要なテンソル (中間アクティベーションなど) は返さないように注意してください。

    注記

    FP16 のトレーニングでは、smdistributed.modelparallel.torch.model_creation() コンテキストマネージャーを使用してモデルをラップする必要があります。詳細については、「FP16 モデル並列処理によるトレーニング」を参照してください。

  3. オプティマイザを smdistributed.modelparallel.torch.DistributedOptimizer でラップします。

    注記

    FP16 トレーニングでは、静的または動的な損失スケーリングを設定する必要があります。詳細については、「FP16 モデル並列処理によるトレーニング」を参照してください。

  4. ユーザーモデルの代わりに、返された DistributedModel オブジェクトを使用します。

  5. フォワードおよびバックワードロジックをステップ関数に入れ、smdistributed.modelparallel.torch.step で修飾します。

  6. torch.cuda.set_device(smp.local_rank()) を使って、各プロセスをそれ固有のデバイスに制限します。

  7. smp.step 呼び出しの前に .to() APIを使って入力テンソルを GPU に移動します (次の例を参照)。

  8. torch.Tensor.backwardtorch.autograd.backwardDistributedModel.backward に置き換えます。

  9. reduce_mean などの StepOutput メソッドを使って、マイクロバッチ全体の出力に対して後処理を実行します。

  10. 評価ステップがある場合は、同様にフォワードロジックを smp.step で修飾された関数内に配置し、StepOutput API を使って出力を後処理します。

  11. DataLoaderdrop_last=True を設定します。または、バッチサイズがマイクロバッチ数で割り切れない場合は、トレーニングループ内でバッチを手動でスキップします。

SageMakerのモデル並列処理ライブラリ API の詳細については、API ドキュメントを参照してください。

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() # define layers def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

による手動分割 PyTorch

smp.partition コンテキストマネージャーを使って、特定のデバイスにモジュールを配置します。どの smp.partition コンテキストにも配置されていないモジュールは、default_partition に配置されます。auto_partitionFalse に設定されている場合は、default_partition を指定する必要があります。特定の smp.partition コンテキスト内に作成されたモジュールは、対応するパーティションに配置されます。

SageMakerのモデル並列処理ライブラリ API の詳細については、API ドキュメントを参照してください。

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class GroupedNet(nn.Module): def __init__(self): super(GroupedNet, self).__init__() with smp.partition(0): # define child modules on device 0 with smp.partition(1): # define child modules on device 1 def forward(self, x): # define forward pass and return model outputs # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time dataset = datasets.MNIST("../data", train=True, download=False) # smdistributed: Shard the dataset based on data-parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = GroupedNet() optimizer = optim.Adadelta(model.parameters(), lr=4.0) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

考慮事項

PyTorch SageMakerのモデル並列処理ライブラリを使用してトレーニングスクリプトを設定する場合、次の点に注意する必要があります。

  • グローバル勾配ノルム (例えば、LAMB オプティマイザの一部のバリアントやグローバル勾配クリッピングなど、モデル全体からの勾配ノルム) に依存する最適化手法を使っている場合は、正確性確保のために、モデルパーティション全体のすべてのノルムを収集する必要があります。これを行うには、ライブラリの通信基本データタイプを使用できます。

  • モデル内の nn.Modules の forward メソッドに渡すすべての torch.Tensor 引数は、モジュール出力の計算に使う必要があります。つまり、ライブラリは、モジュール出力が依存しないモジュールに渡す torch.Tensor 引数が存在するケースをサポートしていません。

  • smp.DistributedModel.backward() 呼び出しに渡す引数は、すべてのモデル出力に依存する必要があります。つまり、smp.DistributedModel.backward 呼び出しに供給されるテンソルの計算で使用されない smp.DistributedModel.forward 呼び出しからの出力はあり得ません。

  • コードに torch.cuda.synchronize() 呼び出しがある場合は、同期呼び出しの直前に torch.cuda.set_device(smp.local_rank()) を呼び出す必要がある場合があります。そうしないと、デバイス 0 に不要な CUDA コンテキストが作成され、メモリを不必要に消費する場合があります。

  • ライブラリは nn.Modules を異なるデバイスに配置するため、モデル内のモジュールは、smp.step 内で変更されるグローバル状態に依存してはいけません。トレーニング中に固定されたままの状態や、すべてのプロセスに表示される方法で smp.step の外部で変更される状態は、すべて許可されます。

  • ライブラリを使う場合、GPU にモデルを移動する必要はありません (例えば、model.to(device) を使って)。モデルがパーティショニングされる前 (最初の smp.step 呼び出しの前) にモデルを GPU に移動しようとすると、move 呼び出しは無視されます。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。ライブラリによるトレーニングが開始したら、モデルを CPU に移動して使用しないでください。モデルにはモジュールの正しいパラメータがなく、プロセスによって保持されているパーティションに割り当てられないためです。モデル並列処理ライブラリを使用してトレーニングした後に、モデルを再トレーニングしたり、ライブラリなしで推論に使用したりする場合は、チェックポインティング API を使用してモデル全体を保存し、通常のモジュールに読み込むことをお勧めします。 PyTorch

  • あるモジュールの出力が別のモジュールにフィードされるようなモジュールのリストがある場合、そのリストを nn.Sequential に置き換えると、パフォーマンスを大幅に向上させることができます。

  • 重みの更新 (optimizer.step()) は、smp.step の外部で実行される必要があります。このときに、バックワードパス全体が終了し、勾配が使用可能になるためです。モデルとデータの並列処理を行うハイブリッドモデルを使用する場合、この時点で勾配も確実に終了します。 AllReduce

  • ライブラリをデータ並列処理と組み合わせて使用する場合は、 AllReduce ステップに参加していないランクを待ってハングアップしないように、すべてのデータparallel ランクのバッチ数が同じであることを確認してください。

  • ml.p4d インスタンスタイプ (ml.p4d.24xlarge など) を使ってトレーニングジョブを起動する場合は、データローダー変数 num_workers=0 を設定する必要があります。例えば、DataLoader を次のように定義できます。

    dataloader = torch.utils.data.DataLoader( data, batch_size=batch_size, num_workers=0, pin_memory=True, drop_last=True, shuffle=shuffle, )
  • smp.step への入力は、DataLoader によって生成されたモデル入力である必要があります。これは、smp.step が内部で入力テンソルをバッチディメンションに沿って分割し、パイプライン化するためです。つまり、DataLoader 自体を smp.step 関数に渡して、内部でモデル入力を生成しようとしてもうまく行きません。

    例えば、DataLoader を次のように定義する場合:

    train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

    train_loader によって生成されたモデル入力にアクセスし、それらを smp.step で修飾された関数に渡してください。train_loadersmp.step 関数に直接渡さないでください。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): ... _, loss_mb = train_step(model, data, target) ... @smp.step def train_step(model, data, target): ... return output, loss
  • smp.step への入力テンソルは、.to() API を使って現在のデバイスに移動する必要があります。これは、torch.cuda.set_device(local_rank()) 呼び出しの後に実行する必要があります。

    例えば、train 関数を次のように定義します。この関数は、.to() API を使って現在のデバイスに datatarget を追加してから、これらの入力テンソルを使って train_step を呼び出します。

    def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by the current process, # based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step()

    この smp.set で修飾された関数への入力テンソルは、上記の train 関数で現在のデバイスに移動されました。モデルを現在のデバイスに移動する必要はありません。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。

    @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss

サポートされていないフレームワーク機能

PyTorch SageMaker以下の機能はのモデル並列処理ライブラリではサポートされていません。

  • ネイティブ PyTorch DDP でデータ並列処理を使用する場合、torch.nn.parallel.DistributedDataParallelラッパーモジュールはライブラリでサポートされません。ライブラリは、パラメータブロードキャストやグラデーションなどの PyTorch DDP との統合を内部的に管理します。 AllReduceライブラリを使う場合、モジュールバッファはトレーニングの開始時に 1 回だけブロードキャストされます。モデルにモジュールバッファがあり、各ステップでデータ並列グループ間で同期させる必要がある場合は、smp.get_dp_process_group() で取得できるプロセスグループを使って、torch.distributed API を使ってそれを行えます。

  • 混合精度トレーニングでは、apex.amp モジュールはサポートされていません。自動混合精度でライブラリを使う推奨方法は、torch.cuda.amp を使用することです。ただし、Torch の実装の代わりに smp.amp.GradScaler を使用することを除きます。

  • torch.jit.ScriptModules または ScriptFunctions は、smp.DistributedModel ではサポートされていません。

  • apex : apex からの FusedLayerNormFusedAdamFusedLAMBFusedNovoGrad はサポートされていません。代わりに、smp.optimizers および smp.nn API を介してこれらのライブラリ実装を使用できます。