

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# SMDDP 集合演算を使用するようにトレーニングスクリプトを適応させる
<a name="data-parallel-modify-sdp-select-framework"></a>

このセクションで紹介するトレーニングスクリプトのサンプルは簡略化されており、SageMaker AI 分散データ並列処理 (SMDDP) ライブラリをトレーニングスクリプトで有効にするために必要な変更点だけを取り上げています。SMDDP ライブラリを使用して分散トレーニングジョブを実行する方法を示すエンドツーエンドの Jupyter Notebook の例については、「[Amazon SageMaker AI データ並列処理ライブラリの例](distributed-data-parallel-v2-examples.md)」を参照してください。

**Topics**
+ [PyTorch トレーニングスクリプトで SMDDP ライブラリを使用する](data-parallel-modify-sdp-pt.md)
+ [PyTorch Lightning トレーニングスクリプトで SMDDP ライブラリを使用する](data-parallel-modify-sdp-pt-lightning.md)
+ [TensorFlow トレーニングスクリプトで SMDDP ライブラリを使用する (非推奨)](data-parallel-modify-sdp-tf2.md)

# PyTorch トレーニングスクリプトで SMDDP ライブラリを使用する
<a name="data-parallel-modify-sdp-pt"></a>

SageMaker AI 分散データ並列処理 (SMDDP) ライブラリ v1.4.0 以降は、[PyTorch の分散パッケージ](https://pytorch.org/tutorials/beginner/dist_overview.html)用のバックエンドオプションとして使用できます。SMDDP の `AllReduce` および `AllGather` 集合演算を使用するには、トレーニングスクリプトの冒頭で SMDDP ライブラリをインポートし、プロセスグループの初期化中に SMDDP を PyTorch 分散モジュールのバックエンドとして設定するだけで済みます。1 行でバックエンドを指定するだけで、ネイティブの PyTorch 分散モジュールはすべてそのまま残すことができ、トレーニングスクリプト全体を変更する必要はありません。次のコードスニペットは、PyTorch ベースの分散トレーニングパッケージ ([PyTorch Distributed Data Parallel (DDP)](https://pytorch.org/docs/stable/notes/ddp.html)、[PyTorch Fully Sharded Data Parallelism (FSDP)](https://pytorch.org/docs/stable/fsdp.html)、[DeepSpeed](https://github.com/microsoft/DeepSpeed)、[Megatron-DeepSpeed](https://github.com/microsoft/Megatron-DeepSpeed)) のバックエンドとして SMDDP ライブラリを使用する方法を示しています。

## PyTorch DDP または FSDP の場合
<a name="data-parallel-enable-for-ptddp-ptfsdp"></a>

プロセスグループを次のように初期化します。

```
import torch.distributed as dist
import smdistributed.dataparallel.torch.torch_smddp

dist.init_process_group(backend="smddp")
```

**注記**  
(PyTorch DDP ジョブのみ) `smddp` バックエンドは、現時点では、`torch.distributed.new_group()` API によるサブプロセスグループの作成には対応していません。`smddp` バックエンドは、`NCCL` や `Gloo` などの他のプロセスグループバックエンドと同時に使用することはできません。

## DeepSpeed または Megatron-DeepSpeed の場合
<a name="data-parallel-enable-for-deepspeed"></a>

プロセスグループを次のように初期化します。

```
import deepspeed
import smdistributed.dataparallel.torch.torch_smddp

deepspeed.init_distributed(dist_backend="smddp")
```

**注記**  
SMDDP の `AllGather` を [SageMaker Python SDK を使用して SMDDP で分散トレーニングジョブを開始する](data-parallel-use-api.md) の `mpirun` ベースのランチャー (`smdistributed` と `pytorchddp`) で使用するには、トレーニングスクリプトで次の環境変数を設定する必要があります。  

```
export SMDATAPARALLEL_OPTIMIZE_SDP=true
```

PyTorch FSDP トレーニングスクリプトの記述に関する一般的なガイダンスについては、PyTorch ドキュメントの「[Advanced Model Training with Fully Sharded Data Parallel (FSDP)](https://pytorch.org/tutorials/intermediate/FSDP_adavnced_tutorial.html)」を参照してください。

PyTorch DDP トレーニングスクリプトの記述に関する一般的なガイダンスについては、PyTorch ドキュメントの「[Getting started with distributed data parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)」を参照してください。

トレーニングスクリプトの調整が完了したら、[SageMaker Python SDK を使用して SMDDP で分散トレーニングジョブを開始する](data-parallel-use-api.md) に進みます。

# PyTorch Lightning トレーニングスクリプトで SMDDP ライブラリを使用する
<a name="data-parallel-modify-sdp-pt-lightning"></a>

[PyTorch Lightning](https://pytorch-lightning.readthedocs.io/en/latest/starter/introduction.html) トレーニングスクリプトを持ち込んで SageMaker AI で分散データ並列トレーニングジョブを実行したい場合は、トレーニングスクリプトに最小限の変更を加えてトレーニングジョブを実行できます。必要な変更として、例えば、`smdistributed.dataparallel` ライブラリの PyTorch モジュールをインポートし、SageMaker トレーニングツールキットによって事前設定された SageMaker AI 環境変数を受け入れるように PyTorch Lightning の環境変数を設定して、プロセスグループのバックエンドを `"smddp"` に設定して SMDDP をライブラリを有効にします。詳細については、コード例を使って各ステップごとに説明します。

**注記**  
PyTorch Lightning サポートは SageMaker AI データ並列ライブラリ v1.5.0 以降で利用できます。

## PyTorch Lightning == v2.1.0 および PyTorch == 2.0.1
<a name="smddp-pt-201-lightning-210"></a>

1. `pytorch_lightning` ライブラリと `smdistributed.dataparallel.torch` モジュールをインポートします。

   ```
   import lightning as pl
   import smdistributed.dataparallel.torch.torch_smddp
   ```

1. [LightningEnvironment](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.plugins.environments.LightningEnvironment.html) をインスタンス化します。

   ```
   from lightning.fabric.plugins.environments.lightning import LightningEnvironment
   
   env = LightningEnvironment()
   env.world_size = lambda: int(os.environ["WORLD_SIZE"])
   env.global_rank = lambda: int(os.environ["RANK"])
   ```

1. **PyTorch DDP の場合** – [DDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DDPStrategy.html) クラスのオブジェクトを作成し、`process_group_backend` には `"smddp"`、`accelerator` には `"gpu"` を指定します。このオブジェクトを [Trainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) クラスに渡します。

   ```
   import lightning as pl
   from lightning.pytorch.strategies import DDPStrategy
   
   ddp = DDPStrategy(
       cluster_environment=env, 
       process_group_backend="smddp", 
       accelerator="gpu"
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=ddp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

   **PyTorch FSDP の場合** – [FSDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.FSDPStrategy.html) クラスのオブジェクトを (選択した[ラッピングポリシー](https://pytorch.org/docs/stable/fsdp.html)で) 作成し、`process_group_backend` には `"smddp"`、`accelerator` には `"gpu"` を指定します。このオブジェクトを [Trainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) クラスに渡します。

   ```
   import lightning as pl
   from lightning.pytorch.strategies import FSDPStrategy
   
   from functools import partial
   from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
   
   policy = partial(
       size_based_auto_wrap_policy, 
       min_num_params=10000
   )
   
   fsdp = FSDPStrategy(
       auto_wrap_policy=policy,
       process_group_backend="smddp", 
       cluster_environment=env
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=fsdp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

トレーニングスクリプトの調整が完了したら、[SageMaker Python SDK を使用して SMDDP で分散トレーニングジョブを開始する](data-parallel-use-api.md) に進みます。

**注記**  
[SageMaker Python SDK を使用して SMDDP で分散トレーニングジョブを開始する](data-parallel-use-api.md) で SageMaker AI PyTorch 推定器を作成してトレーニングジョブリクエストを送信する場合、SageMaker AI PyTorch トレーニングコンテナに `pytorch-lightning` および `lightning-bolts` をインストールするために `requirements.txt` を指定する必要があります。  

```
# requirements.txt
pytorch-lightning
lightning-bolts
```
トレーニングスクリプトやジョブ送信と一緒に `requirements.txt` ファイルを配置するソースディレクトリを指定する方法の詳細については、Amazon SageMaker AI Python SDK ドキュメントの「[Using third-party libraries](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#id12)」を参照してください。

# TensorFlow トレーニングスクリプトで SMDDP ライブラリを使用する (非推奨)
<a name="data-parallel-modify-sdp-tf2"></a>

**重要**  
SMDDP ライブラリは TensorFlow のサポートを終了し、v2.11.0 より新しい TensorFlow の DLC では使用できなくなりました。SMDDP ライブラリがインストールされている以前の TensorFlow DLC を調べるには、「[サポートされるフレームワーク](distributed-data-parallel-support.md#distributed-data-parallel-supported-frameworks)」を参照してください。

次のステップは、TensorFlow トレーニングスクリプトを変更して SageMaker AI の分散データ並列ライブラリを利用する方法を示しています。  

ライブラリの API は Horovod の API と同様になるように設計されています。ライブラリが TensorFlow に提供する各 API の詳細については、[SageMaker AI 分散データ並列 TensorFlow API ドキュメント](https://sagemaker.readthedocs.io/en/stable/api/training/smd_data_parallel.html#api-documentation)を参照してください。

**注記**  
SageMaker AI 分散データ並列は、`tf.keras` モジュールを除く `tf` コアモジュールで構成される TensorFlow トレーニングスクリプトに適応可能です。SageMaker AI 分散データ並列は、Keras 実装を使った TensorFlow はサポートしていません。

**注記**  
SageMaker AI の分散データ並列処理ライブラリは、Automatic Mixed Precision (AMP) に標準対応しています。AMP を有効にするには、トレーニングスクリプトにフレームワークレベルの変更を加える以外、特別なアクションは必要ありません。勾配が FP16 の場合、SageMaker AI データ並列処理ライブラリは、FP16 で `AllReduce` オペレーションを実行します。トレーニングスクリプトへの AMP API の実装に関する詳細は、次のリソースを参照してください。  
「[Frameworks - TensorFlow](https://docs.nvidia.com/deeplearning/performance/mixed-precision-training/index.html#tensorflow)」(「NVIDIA Deep Learning Performance ドキュメント」)
[深層学習の自動混合精度](https://developer.nvidia.com/automatic-mixed-precision) (NVIDIA デベロッパードキュメント)**
[TensorFlow 混合精度 API](https://www.tensorflow.org/guide/mixed_precision) (TensorFlow ドキュメント)**

1. ライブラリの TensorFlow クライアントをインポートし、初期化します。

   ```
   import smdistributed.dataparallel.tensorflow as sdp 
   sdp.init()
   ```

1. 各 GPU を 1 つの `smdistributed.dataparallel` プロセスに `local_rank` を付けて固定します。これは、特定のノード内のプロセスの相対ランクを参照します。`sdp.tensorflow.local_rank()` API により、デバイスのローカルランクを取得できます。リーダーノードはランク 0 で、ワーカーノードはランク 1、2、3 などとなります。これは、次のコードブロックで `sdp.local_rank()` として呼び出されます。`set_memory_growth` は SageMaker AI 分散とは直接関係ありませんが、TensorFlow を使用した分散トレーニングのために設定する必要があります。

   ```
   gpus = tf.config.experimental.list_physical_devices('GPU')
   for gpu in gpus:
       tf.config.experimental.set_memory_growth(gpu, True)
   if gpus:
       tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
   ```

1. 学習レートをワーカー数でスケールします。`sdp.tensorflow.size()` API により、クラスター内のワーカー数を取得できます。これは、次のコードブロックで `sdp.size()` として呼び出されます。

   ```
   learning_rate = learning_rate * sdp.size()
   ```

1. ライブラリの `DistributedGradientTape` を使用して、トレーニング中の `AllReduce` オペレーションを最適化します。これは `tf.GradientTape` をラップします。  

   ```
   with tf.GradientTape() as tape:
         output = model(input)
         loss_value = loss(label, output)
       
   # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
   tape = sdp.DistributedGradientTape(tape)
   ```

1. 初期モデル変数をリーダーノード (ランク 0) からすべてのワーカーノード (ランク 1～n) にブロードキャストします。これは、すべてのワーカーランクにわたる一貫した初期化を保証するために必要です。モデル変数とオプティマイザ変数が初期化された後、`sdp.tensorflow.broadcast_variables` API を使用します。これは、次のコードブロックで `sdp.broadcast_variables()` として呼び出されます。

   ```
   sdp.broadcast_variables(model.variables, root_rank=0)
   sdp.broadcast_variables(opt.variables(), root_rank=0)
   ```

1. 最後に、チェックポイントをリーダーノードのみに保存するようにスクリプトを変更します。リーダーノードには同期されたモデルがあります。これにより、ワーカーノードがチェックポイントを上書きしてチェックポイントを破損する可能性を回避できます。

   ```
   if sdp.rank() == 0:
       checkpoint.save(checkpoint_dir)
   ```

次に、ライブラリを使用した分散トレーニングの TensorFlow トレーニングスクリプトの例を示します。

```
import tensorflow as tf

# SageMaker AI data parallel: Import the library TF API
import smdistributed.dataparallel.tensorflow as sdp

# SageMaker AI data parallel: Initialize the library
sdp.init()

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    # SageMaker AI data parallel: Pin GPUs to a single library process
    tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')

# Prepare Dataset
dataset = tf.data.Dataset.from_tensor_slices(...)

# Define Model
mnist_model = tf.keras.Sequential(...)
loss = tf.losses.SparseCategoricalCrossentropy()

# SageMaker AI data parallel: Scale Learning Rate
# LR for 8 node run : 0.000125
# LR for single node run : 0.001
opt = tf.optimizers.Adam(0.000125 * sdp.size())

@tf.function
def training_step(images, labels, first_batch):
    with tf.GradientTape() as tape:
        probs = mnist_model(images, training=True)
        loss_value = loss(labels, probs)

    # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
    tape = sdp.DistributedGradientTape(tape)

    grads = tape.gradient(loss_value, mnist_model.trainable_variables)
    opt.apply_gradients(zip(grads, mnist_model.trainable_variables))

    if first_batch:
       # SageMaker AI data parallel: Broadcast model and optimizer variables
       sdp.broadcast_variables(mnist_model.variables, root_rank=0)
       sdp.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value

...

# SageMaker AI data parallel: Save checkpoints only from master node.
if sdp.rank() == 0:
    checkpoint.save(checkpoint_dir)
```

トレーニングスクリプトの調整が完了したら、「[SageMaker Python SDK を使用して SMDDP で分散トレーニングジョブを開始する](data-parallel-use-api.md)」に進みます。