

# Adapting your training script to use the SMDDP collective operations
<a name="data-parallel-modify-sdp-select-framework"></a>

The training script examples provided in this section are simplified and highlight only the required changes to enable the SageMaker AI distributed data parallelism (SMDDP) library in your training script. For end-to-end Jupyter notebook examples that demonstrate how to run a distributed training job with the SMDDP library, see [Amazon SageMaker AI data parallelism library examples](distributed-data-parallel-v2-examples.md).

**Topics**
+ [Use the SMDDP library in your PyTorch training script](data-parallel-modify-sdp-pt.md)
+ [Use the SMDDP library in your PyTorch Lightning training script](data-parallel-modify-sdp-pt-lightning.md)
+ [Use the SMDDP library in your TensorFlow training script (deprecated)](data-parallel-modify-sdp-tf2.md)

# Use the SMDDP library in your PyTorch training script
<a name="data-parallel-modify-sdp-pt"></a>

Starting from the SageMaker AI distributed data parallelism (SMDDP) library v1.4.0, you can use the library as a backend option for the [PyTorch distributed package](https://pytorch.org/tutorials/beginner/dist_overview.html). To use the SMDDP `AllReduce` and `AllGather` collective operations, you only need to import the SMDDP library at the beginning of your training script and set SMDDP as the the backend of PyTorch distributed modules during process group initialization. With the single line of backend specification, you can keep all the native PyTorch distributed modules and the entire training script unchanged. The following code snippets show how to use the SMDDP library as the backend of PyTorch-based distributed training packages: [PyTorch distributed data parallel (DDP)](https://pytorch.org/docs/stable/notes/ddp.html), [PyTorch fully sharded data parallelism (FSDP)](https://pytorch.org/docs/stable/fsdp.html), [DeepSpeed](https://github.com/microsoft/DeepSpeed), and [Megatron-DeepSpeed](https://github.com/microsoft/Megatron-DeepSpeed).

## For PyTorch DDP or FSDP
<a name="data-parallel-enable-for-ptddp-ptfsdp"></a>

Initialize the process group as follows.

```
import torch.distributed as dist
import smdistributed.dataparallel.torch.torch_smddp

dist.init_process_group(backend="smddp")
```

**Note**  
(For PyTorch DDP jobs only) The `smddp` backend currently does not support creating subprocess groups with the `torch.distributed.new_group()` API. You also cannot use the `smddp` backend concurrently with other process group backends such as `NCCL` and `Gloo`.

## For DeepSpeed or Megatron-DeepSpeed
<a name="data-parallel-enable-for-deepspeed"></a>

Initialize the process group as follows.

```
import deepspeed
import smdistributed.dataparallel.torch.torch_smddp

deepspeed.init_distributed(dist_backend="smddp")
```

**Note**  
To use SMDDP `AllGather` with the `mpirun`-based launchers (`smdistributed` and `pytorchddp`) in [Launching distributed training jobs with SMDDP using the SageMaker Python SDK](data-parallel-use-api.md), you also need to set the following environment variable in your training script.  

```
export SMDATAPARALLEL_OPTIMIZE_SDP=true
```

For general guidance on writing a PyTorch FSDP training script, see [Advanced Model Training with Fully Sharded Data Parallel (FSDP)](https://pytorch.org/tutorials/intermediate/FSDP_adavnced_tutorial.html) in the PyTorch documentation.

For general guidance on writing a PyTorch DDP training script, see [Getting started with distributed data parallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) in the PyTorch documentation.

After you have completed adapting your training script, proceed to [Launching distributed training jobs with SMDDP using the SageMaker Python SDK](data-parallel-use-api.md).

# Use the SMDDP library in your PyTorch Lightning training script
<a name="data-parallel-modify-sdp-pt-lightning"></a>

If you want to bring your [PyTorch Lightning](https://pytorch-lightning.readthedocs.io/en/latest/starter/introduction.html) training script and run a distributed data parallel training job in SageMaker AI, you can run the training job with minimal changes in your training script. The necessary changes include the following: import the `smdistributed.dataparallel` library’s PyTorch modules, set up the environment variables for PyTorch Lightning to accept the SageMaker AI environment variables that are preset by the SageMaker training toolkit, and activate the SMDDP library by setting the process group backend to `"smddp"`. To learn more, walk through the following instructions that break down the steps with code examples.

**Note**  
The PyTorch Lightning support is available in the SageMaker AI data parallel library v1.5.0 and later.

## PyTorch Lightning == v2.1.0 and PyTorch == 2.0.1
<a name="smddp-pt-201-lightning-210"></a>

1. Import the `pytorch_lightning` library and the `smdistributed.dataparallel.torch` modules.

   ```
   import lightning as pl
   import smdistributed.dataparallel.torch.torch_smddp
   ```

1. Instantiate the [LightningEnvironment](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.plugins.environments.LightningEnvironment.html).

   ```
   from lightning.fabric.plugins.environments.lightning import LightningEnvironment
   
   env = LightningEnvironment()
   env.world_size = lambda: int(os.environ["WORLD_SIZE"])
   env.global_rank = lambda: int(os.environ["RANK"])
   ```

1. **For PyTorch DDP** – Create an object of the [DDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DDPStrategy.html) class with `"smddp"` for `process_group_backend` and `"gpu"` for `accelerator`, and pass that to the [Trainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) class.

   ```
   import lightning as pl
   from lightning.pytorch.strategies import DDPStrategy
   
   ddp = DDPStrategy(
       cluster_environment=env, 
       process_group_backend="smddp", 
       accelerator="gpu"
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=ddp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

   **For PyTorch FSDP** – Create an object of the [FSDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.FSDPStrategy.html) class (with [wrapping policy](https://pytorch.org/docs/stable/fsdp.html) of choice) with `"smddp"` for `process_group_backend` and `"gpu"` for `accelerator`, and pass that to the [Trainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) class.

   ```
   import lightning as pl
   from lightning.pytorch.strategies import FSDPStrategy
   
   from functools import partial
   from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
   
   policy = partial(
       size_based_auto_wrap_policy, 
       min_num_params=10000
   )
   
   fsdp = FSDPStrategy(
       auto_wrap_policy=policy,
       process_group_backend="smddp", 
       cluster_environment=env
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=fsdp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

After you have completed adapting your training script, proceed to [Launching distributed training jobs with SMDDP using the SageMaker Python SDK](data-parallel-use-api.md). 

**Note**  
When you construct a SageMaker AI PyTorch estimator and submit a training job request in [Launching distributed training jobs with SMDDP using the SageMaker Python SDK](data-parallel-use-api.md), you need to provide `requirements.txt` to install `pytorch-lightning` and `lightning-bolts` in the SageMaker AI PyTorch training container.  

```
# requirements.txt
pytorch-lightning
lightning-bolts
```
For more information about specifying the source directory to place the `requirements.txt` file along with your training script and a job submission, see [Using third-party libraries](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#id12) in the *Amazon SageMaker AI Python SDK documentation*.

# Use the SMDDP library in your TensorFlow training script (deprecated)
<a name="data-parallel-modify-sdp-tf2"></a>

**Important**  
The SMDDP library discontinued support for TensorFlow and is no longer available in DLCs for TensorFlow later than v2.11.0. To find previous TensorFlow DLCs with the SMDDP library installed, see [Supported frameworks](distributed-data-parallel-support.md#distributed-data-parallel-supported-frameworks).

The following steps show you how to modify a TensorFlow training script to utilize SageMaker AI's distributed data parallel library.  

The library APIs are designed to be similar to Horovod APIs. For additional details on each API that the library offers for TensorFlow, see the [SageMaker AI distributed data parallel TensorFlow API documentation](https://sagemaker.readthedocs.io/en/stable/api/training/smd_data_parallel.html#api-documentation).

**Note**  
SageMaker AI distributed data parallel is adaptable to TensorFlow training scripts composed of `tf` core modules except `tf.keras` modules. SageMaker AI distributed data parallel does not support TensorFlow with Keras implementation.

**Note**  
The SageMaker AI distributed data parallelism library supports Automatic Mixed Precision (AMP) out of the box. No extra action is needed to enable AMP other than the framework-level modifications to your training script. If gradients are in FP16, the SageMaker AI data parallelism library runs its `AllReduce` operation in FP16. For more information about implementing AMP APIs to your training script, see the following resources:  
[Frameworks - TensorFlow](https://docs.nvidia.com/deeplearning/performance/mixed-precision-training/index.html#tensorflow) in the *NVIDIA Deep Learning Performance documentation*
[Automatic Mixed Precision for Deep Learning](https://developer.nvidia.com/automatic-mixed-precision) in the *NVIDIA Developer Docs*
[TensorFlow mixed precision APIs](https://www.tensorflow.org/guide/mixed_precision) in the *TensorFlow documentation*

1. Import the library's TensorFlow client and initialize it.

   ```
   import smdistributed.dataparallel.tensorflow as sdp 
   sdp.init()
   ```

1. Pin each GPU to a single `smdistributed.dataparallel` process with `local_rank`—this refers to the relative rank of the process within a given node. The `sdp.tensorflow.local_rank()` API provides you with the local rank of the device. The leader node is rank 0, and the worker nodes are rank 1, 2, 3, and so on. This is invoked in the following code block as `sdp.local_rank()`. `set_memory_growth` is not directly related to SageMaker AI distributed, but must be set for distributed training with TensorFlow. 

   ```
   gpus = tf.config.experimental.list_physical_devices('GPU')
   for gpu in gpus:
       tf.config.experimental.set_memory_growth(gpu, True)
   if gpus:
       tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
   ```

1. Scale the learning rate by the number of workers. The `sdp.tensorflow.size()` API provides you the number of workers in the cluster. This is invoked in the following code block as `sdp.size()`. 

   ```
   learning_rate = learning_rate * sdp.size()
   ```

1. Use the library’s `DistributedGradientTape` to optimize `AllReduce` operations during training. This wraps `tf.GradientTape`.  

   ```
   with tf.GradientTape() as tape:
         output = model(input)
         loss_value = loss(label, output)
       
   # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
   tape = sdp.DistributedGradientTape(tape)
   ```

1. Broadcast the initial model variables from the leader node (rank 0) to all the worker nodes (ranks 1 through n). This is needed to ensure a consistent initialization across all the worker ranks. Use the `sdp.tensorflow.broadcast_variables` API after the model and optimizer variables are initialized. This is invoked in the following code block as `sdp.broadcast_variables()`. 

   ```
   sdp.broadcast_variables(model.variables, root_rank=0)
   sdp.broadcast_variables(opt.variables(), root_rank=0)
   ```

1. Finally, modify your script to save checkpoints only on the leader node. The leader node has a synchronized model. This also avoids worker nodes overwriting the checkpoints and possibly corrupting the checkpoints. 

   ```
   if sdp.rank() == 0:
       checkpoint.save(checkpoint_dir)
   ```

The following is an example TensorFlow training script for distributed training with the library.

```
import tensorflow as tf

# SageMaker AI data parallel: Import the library TF API
import smdistributed.dataparallel.tensorflow as sdp

# SageMaker AI data parallel: Initialize the library
sdp.init()

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    # SageMaker AI data parallel: Pin GPUs to a single library process
    tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')

# Prepare Dataset
dataset = tf.data.Dataset.from_tensor_slices(...)

# Define Model
mnist_model = tf.keras.Sequential(...)
loss = tf.losses.SparseCategoricalCrossentropy()

# SageMaker AI data parallel: Scale Learning Rate
# LR for 8 node run : 0.000125
# LR for single node run : 0.001
opt = tf.optimizers.Adam(0.000125 * sdp.size())

@tf.function
def training_step(images, labels, first_batch):
    with tf.GradientTape() as tape:
        probs = mnist_model(images, training=True)
        loss_value = loss(labels, probs)

    # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
    tape = sdp.DistributedGradientTape(tape)

    grads = tape.gradient(loss_value, mnist_model.trainable_variables)
    opt.apply_gradients(zip(grads, mnist_model.trainable_variables))

    if first_batch:
       # SageMaker AI data parallel: Broadcast model and optimizer variables
       sdp.broadcast_variables(mnist_model.variables, root_rank=0)
       sdp.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value

...

# SageMaker AI data parallel: Save checkpoints only from master node.
if sdp.rank() == 0:
    checkpoint.save(checkpoint_dir)
```

After you have completed adapting your training script, move on to [Launching distributed training jobs with SMDDP using the SageMaker Python SDK](data-parallel-use-api.md). 