

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# TensorFlow トレーニングスクリプトを変更する
<a name="model-parallel-customize-training-script-tf"></a>

このセクションでは、TensorFlow トレーニングスクリプトを変更して、自動パーティショニングおよび手動パーティショニング用に SageMaker モデル並列処理ライブラリを設定する方法を学習します。例として選んだ中には、モデルとデータのハイブリッド並列処理のための Horovod との統合例も含まれています。

**注記**  
ライブラリでサポートされている TensorFlow のバージョンを確認するには、「[サポートされているフレームワークと AWS リージョン](distributed-model-parallel-support.md)」を参照してください。

ライブラリを使うためにトレーニングスクリプトに加える必要のある変更については、「[TensorFlow による自動分割](#model-parallel-customize-training-script-tf-23)」を参照してください。

Horovod でモデルとデータのハイブリッド並列処理を使うようにトレーニングスクリプトを変更する方法については、「[ハイブリッドモデルおよびデータ並列処理のための TensorFlow および Horovod による自動分割](#model-parallel-customize-training-script-tf-2.3)」を参照してください。

手動パーティショニングを使用する場合は、「[TensorFlow による手動分割](#model-parallel-customize-training-script-tf-manual)」も確認してください。

次のトピックでは、TensorFlow モデルの自動パーティショニングおよび手動パーティショニング用の SageMaker のモデル並列処理ライブラリを設定するために使用できるトレーニングスクリプトの例を示します。

**注記**  
自動パーティショニングはデフォルトで有効になっています。特に指定がない限り、サンプルスクリプトでは自動パーティショニングを使用します。

**Topics**
+ [TensorFlow による自動分割](#model-parallel-customize-training-script-tf-23)
+ [ハイブリッドモデルおよびデータ並列処理のための TensorFlow および Horovod による自動分割](#model-parallel-customize-training-script-tf-2.3)
+ [TensorFlow による手動分割](#model-parallel-customize-training-script-tf-manual)
+ [サポートされていないフレームワーク機能](#model-parallel-tf-unsupported-features)

## TensorFlow による自動分割
<a name="model-parallel-customize-training-script-tf-23"></a>

SageMaker のモデル並列処理ライブラリを使用して TensorFlow モデルを実行するには、次のトレーニングスクリプトの変更が必要です。

1. ライブラリをインポートし、[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) で初期化します。

1. Keras Model クラスの代わりに [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html) から継承することで、Keras モデルを定義します。`smp.DistributedModel` オブジェクトの call メソッドからのモデル出力を返します。call メソッドから返されたテンソルはどれもモデル並列デバイス間でブロードキャストされるため、通信オーバーヘッドが発生します。したがって、call メソッド範囲外の不要なテンソル (中間アクティベーションなど) は返さないように注意してください。

1. `tf.Dataset.batch()` メソッドで `drop_remainder=True` を設定します。これは、バッチサイズが常にマイクロバッチ数で割り切れるようにするためです。

1. データパイプラインのランダムオペレーションを `smp.dp_rank()` を使ってシードします (例: `shuffle(ds, seed=smp.dp_rank())`)。これにより、異なるモデルパーティションを保持する GPU 間でデータサンプルの一貫性が確保されます。

1. フォワードおよびバックワードロジックをステップ関数に入れ、`smp.step` で修飾します。

1. `reduce_mean` などの [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) メソッドを使って、マイクロバッチ全体の出力に対して後処理を実行します。[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 関数には、`smp.DistributedModel` の出力に依存する戻り値が必要です。

1. 評価ステップがある場合は、同様にフォワードロジックを `smp.step` で修飾された関数内に配置し、[`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) を使って出力を後処理します。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

次の Python スクリプトは、変更後のトレーニングスクリプトの例です。

```
import tensorflow as tf

# smdistributed: Import TF2.x API
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return the model output

model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

トレーニングスクリプトの準備が完了したら、[ステップ 2: SageMaker Python SDK を使用してトレーニングジョブを起動する](model-parallel-sm-sdk.md) に進んでください。ハイブリッドモデルとデータの並列トレーニングジョブを実行する場合は、次のセクションに進んでください。

## ハイブリッドモデルおよびデータ並列処理のための TensorFlow および Horovod による自動分割
<a name="model-parallel-customize-training-script-tf-2.3"></a>

SageMaker モデル並列処理ライブラリを Horovod と共に使用することで、モデルとデータのハイブリッド並列処理を行うことができます。ハイブリッド並列処理用にライブラリがモデルを分割する方法の詳細については、「[パイプライン並列処理 (PyTorch と TensorFlow で利用可能)](model-parallel-intro.md#model-parallel-intro-pp)」を参照してください。

このステップでは、トレーニングスクリプトを変更して、SageMaker モデル並列処理ライブラリを適応させる方法に焦点を当てます。

トレーニングスクリプトを適切に設定して、[ステップ 2: SageMaker Python SDK を使用してトレーニングジョブを起動する](model-parallel-sm-sdk.md) で設定するハイブリッド並列処理設定を適用するには、データ並列ランクとモデル並列ランクをそれぞれ自動的に検出するライブラリのヘルパー関数 `smp.dp_rank()` および `smp.mp_rank()` を使用します。

ライブラリがサポートするすべての MPI プリミティブを検索するには、『SageMaker Python SDK ドキュメント』の「[MPI の基礎](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#mpi-basics)」を参照してください。

スクリプトに必要な変更点は次のとおりです。
+ `hvd.allreduce` を追加する
+ Horovod の要求に応じて、最初のバッチの後に変数をブロードキャストする
+ データパイプラインのシャッフルやシャードのオペレーションを `smp.dp_rank()` を使ってシードする。

**注記**  
Horovod を使用するときは、トレーニングスクリプトで `hvd.init` を直接呼び出すことはできません。代わりに、[ステップ 2: SageMaker Python SDK を使用してトレーニングジョブを起動する](model-parallel-sm-sdk.md) の SageMaker Python SDK の `modelparallel` パラメータで、`"horovod"` を `True` に設定する必要があります。これにより、ライブラリはモデルパーティションのデバイス割り当てに基づいて Horovod を内部的に初期化できます。`hvd.init()` をトレーニングスクリプト内で直接呼び出すと、問題が発生する可能性があります。

**注記**  
トレーニングスクリプト内で `hvd.DistributedOptimizer` API を直接使用すると、API が黙示的に `AllReduce` オペレーションを `smp.step` 内に配置するため、トレーニングのパフォーマンスと速度が低下する可能性があります。Horovod でモデル並列処理ライブラリを使用する場合は、次の例で示すように、`smp.step` から返された勾配に対して `accumulate()` または `reduce_mean()` を呼び出した後に `hvd.allreduce` を直接呼び出すことをお勧めします。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import tensorflow as tf
import horovod.tensorflow as hvd

# smdistributed: Import TF2.x API 
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return model outputs


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels, first_batch):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    # Horovod: AllReduce the accumulated gradients
    gradients = [hvd.allreduce(g.accumulate()) for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Horovod: Broadcast the variables after first batch 
    if first_batch:
        hvd.broadcast_variables(model.variables, root_rank=0)
        hvd.broadcast_variables(optimizer.variables(), root_rank=0)

    # smdistributed: Merge predictions across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()

    for batch, (images, labels) in enumerate(train_ds):
        loss = train_step(images, labels, tf.constant(batch == 0))
```

## TensorFlow による手動分割
<a name="model-parallel-customize-training-script-tf-manual"></a>

`smp.partition` コンテキストマネージャーを使って、特定のパーティションにオペレーションを配置します。どの `smp.partition` コンテキストにも配置されていないオペレーションは、`default_partition` に配置されます。SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import tensorflow as tf

# smdistributed: Import TF2.x API.
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches.
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API.
class MyModel(smp.DistributedModel):
    def __init__(self):
         # define layers

    def call(self, x):
        with smp.partition(0):
            x = self.layer0(x)
        with smp.partition(1):
            return self.layer1(x)


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

## サポートされていないフレームワーク機能
<a name="model-parallel-tf-unsupported-features"></a>

次の TensorFlow 機能はライブラリではサポートされていません。
+ `tf.GradientTape()` は現在サポートされていません。代わりに `Optimizer.get_gradients()` または `Optimizer.compute_gradients()` を使って勾配を計算できます。
+ `tf.train.Checkpoint.restore()` API は現在サポートされていません。チェックポイントには、代わりに同じ API と機能を備えた `smp.CheckpointManager` を使用してください。`smp.CheckpointManager` によるチェックポイントの復元は、最初のステップの後で実行することに注意してください。