

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 修改 TensorFlow 訓練指令碼
<a name="model-parallel-customize-training-script-tf"></a>

在本節中，您將學習如何修改 TensorFlow 訓練指令碼，以針對自動磁碟分割和手動磁碟分割設定 SageMaker 模型平行處理程式庫。這個範例選擇也包含與 Horovod 整合的範例，以用於混合模型和資料平行處理。

**注意**  
若需有關程式庫支援的 TensorFlow 版本資訊，請參閱[支援的架構和 AWS 區域](distributed-model-parallel-support.md)。

有關使用程式庫前必須對訓練指令碼進行的修改，已列入 [使用 TensorFlow 自動化分割](#model-parallel-customize-training-script-tf-23)。

想了解如何修改訓練指令碼，以便將混合模型和資料平行處理與 Horovod 搭配使用，請參閱[運用 TensorFlow 和 Horovod 進行混合模型和資料平行處理的自動化分割。](#model-parallel-customize-training-script-tf-2.3)。

若選擇手動磁碟分割，請參考[使用 TensorFlow 手動分割](#model-parallel-customize-training-script-tf-manual)。

下列主題提供訓練指令碼範例，您可以使用這些指令碼來設定 SageMaker 模型平行處理程式庫，以進行自動分割和手動分割 TensorFlow 模型。

**注意**  
自動分割預設為開啟。除非特別指定，否則範例指令碼都採用自動分割。

**Topics**
+ [使用 TensorFlow 自動化分割](#model-parallel-customize-training-script-tf-23)
+ [運用 TensorFlow 和 Horovod 進行混合模型和資料平行處理的自動化分割。](#model-parallel-customize-training-script-tf-2.3)
+ [使用 TensorFlow 手動分割](#model-parallel-customize-training-script-tf-manual)
+ [不支援架構功能](#model-parallel-tf-unsupported-features)

## 使用 TensorFlow 自動化分割
<a name="model-parallel-customize-training-script-tf-23"></a>

若要使用 SageMaker 模型平行處理程式庫執行 TensorFlow 模型，必須變更以下訓練指令碼：

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 匯入和初始化程式庫。

1. 透過繼承自 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html) 而不是 Keras 模型類別，來定義一個 Keras 模型。從 `smp.DistributedModel` 物件的呼叫方法傳回模型輸出。請注意，從呼叫方法傳回的任何張量都將跨模型平行裝置廣播，造成通訊開銷增加，因此不需傳回在呼叫方法之外的非必要張量 (例如中繼啟動)。

1. 在 `tf.Dataset.batch()` 方法中設定 `drop_remainder=True`。這是為了確保批次大小必然可以被微批次數量整除。

1. 使用 `smp.dp_rank()` (如 `shuffle(ds, seed=smp.dp_rank())`) 在 Data Pipeline 中植入隨機操作，確保存放不同模型分割的 GPU 上的資料範例保持一致。

1. 將轉送和向後邏輯放在 Step Function 中，並使用 `smp.step` 進行裝飾。

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) 方法 (如 `reduce_mean`) 對微批次的輸出上執行後處理。[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 函式的傳回值必須取決於 `smp.DistributedModel` 的輸出。

1. 如果有評估步驟，採取類似將轉送邏輯放在 `smp.step`-裝飾函式內的作法，並使用 [`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) 對輸出執行後處理。

若要進一步了解 SageMaker 模型平行處理程式庫 API，請參閱 [API 文件](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)。

下列 Python 指令碼是進行變更之後的訓練指令碼範例。

```
import tensorflow as tf

# smdistributed: Import TF2.x API
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return the model output

model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

如果您已完成訓練指令碼的準備，請繼續執行 [步驟 2：使用 SageMaker Python SDK 啟動訓練任務](model-parallel-sm-sdk.md)。如果想要執行混合模型和資料平行訓練任務，請繼續至下一節。

## 運用 TensorFlow 和 Horovod 進行混合模型和資料平行處理的自動化分割。
<a name="model-parallel-customize-training-script-tf-2.3"></a>

您可以透過 SageMaker 模型平行處理程式庫搭配 Horovod 使用，以進行混合模型和資料平行處理。若要閱讀更多有關程式庫針對混合式平行處理分割模型方式的資訊，請參閱[管道平行處理 (適用於 PyTorch 和 TensorFlow)](model-parallel-intro.md#model-parallel-intro-pp)。

在此步驟中，我們將著重在介紹如何修改訓練指令碼，以適應 SageMaker 模型平行處理程式庫。

為正確設定訓練指令碼，以取得您在 [步驟 2：使用 SageMaker Python SDK 啟動訓練任務](model-parallel-sm-sdk.md) 要設定的混合式平行處理組態，請使用程式庫的輔助函式 `smp.dp_rank()` 和 `smp.mp_rank()`，即會分別自動偵測資料平行和模型平行的排名。

若要尋找程式庫支援的所有 MPI 基本，請參閱 SageMaker Python SDK 文件中的 [MPI 基本資訊](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#mpi-basics)。

指令碼中所需的必要變更如下：
+ 新增 `hvd.allreduce`
+ 根據 Horovod 的要求，在第一批次後廣播變數。
+ 在 `smp.dp_rank()` 的 Data Pipeline 中植入隨機顯示和/或碎片操作。

**注意**  
當您使用 Horovod 時，不得在訓練指令碼中直接呼叫 `hvd.init`。反之，您需要在 SageMaker Python SDK [步驟 2：使用 SageMaker Python SDK 啟動訓練任務](model-parallel-sm-sdk.md) 的 `modelparallel` 參數，將 `"horovod"` 設定為`True`。這讓程式庫可以基於模型分割的裝置指派狀況，在內部初始化 Horovod。直接在訓練指令碼中呼叫 `hvd.init()`，可能會造成問題。

**注意**  
直接在訓練指令碼中使用 `hvd.DistributedOptimizer` API，可能會導致訓練成效不佳與速度減緩，因為 API 背景作業下會將 `AllReduce` 操作置於 `smp.step` 中。我們建議您在取得 `smp.step` 傳回的漸層上呼叫 `accumulate()` 或 `reduce_mean()` 之後直接呼叫 `hvd.allreduce`，以搭配 Horovod 使用模型平行處理程式庫，如下列範例所示。

若要進一步了解 SageMaker 模型平行處理程式庫 API，請參閱 [API 文件](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)。

```
import tensorflow as tf
import horovod.tensorflow as hvd

# smdistributed: Import TF2.x API 
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return model outputs


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels, first_batch):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    # Horovod: AllReduce the accumulated gradients
    gradients = [hvd.allreduce(g.accumulate()) for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Horovod: Broadcast the variables after first batch 
    if first_batch:
        hvd.broadcast_variables(model.variables, root_rank=0)
        hvd.broadcast_variables(optimizer.variables(), root_rank=0)

    # smdistributed: Merge predictions across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()

    for batch, (images, labels) in enumerate(train_ds):
        loss = train_step(images, labels, tf.constant(batch == 0))
```

## 使用 TensorFlow 手動分割
<a name="model-parallel-customize-training-script-tf-manual"></a>

使用 `smp.partition` 內容管理器，將操作指派至特定分割中。未放置在任何 `smp.partition` 內容中的任何操作都會放置在 `default_partition`。若要進一步了解 SageMaker 模型平行處理程式庫 API，請參閱 [API 文件](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)。

```
import tensorflow as tf

# smdistributed: Import TF2.x API.
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches.
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API.
class MyModel(smp.DistributedModel):
    def __init__(self):
         # define layers

    def call(self, x):
        with smp.partition(0):
            x = self.layer0(x)
        with smp.partition(1):
            return self.layer1(x)


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

## 不支援架構功能
<a name="model-parallel-tf-unsupported-features"></a>

程式庫不支援下列 TensorFlow 功能：
+ 目前不支援 `tf.GradientTape()`。您可以使用 `Optimizer.get_gradients()` 或 `Optimizer.compute_gradients()` 來運算漸層。
+ 目前不支援 `tf.train.Checkpoint.restore()` API。如為檢查點，請改為使用 `smp.CheckpointManager` 提供相同的 API 和功能。請注意，`smp.CheckpointManager` 的檢查點還原應該在第一步驟後進行。