本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
修改 TensorFlow 訓練指令集
在本節中,您將學習如何修改 TensorFlow 訓練指令碼,以設定用於自動磁碟分割和手動磁碟分割的 SageMaker模型平行程式庫。這個範例選擇也包含與 Horovod 整合的範例,以用於混合模型和資料平行處理。
注意
若要尋找程式庫支援哪些 TensorFlow 版本,請參閱支援的架構與 AWS 區域。
有關使用程式庫前必須對訓練指令碼進行的修改,已列入 自動拆分 TensorFlow。
想了解如何修改訓練指令碼,以便將混合模型和資料平行處理與 Horovod 搭配使用,請參閱使用 TensorFlow 和 Horovod 進行自動化分割,適用於混合模型和資料平行處理。
若選擇手動磁碟分割,請參考手動分割 TensorFlow。
下列主題顯示訓練指令碼範例,您可以使用這些指令碼來設定 SageMaker自動磁碟分割和手動磁碟分割模型的模型平行程式庫。 TensorFlow
注意
自動分割預設為開啟。除非特別指定,否則範例指令碼都採用自動分割。
自動拆分 TensorFlow
若要執行具有模型平行程式庫 SageMaker的 TensorFlow 模型,需要進行下列訓練指令碼變更:
-
使用
smp.init()
匯入和初始化程式庫。 -
透過繼承自
smp.DistributedModel
而不是 Keras 模型類別,來定義一個 Keras 模型。從 smp.DistributedModel
物件的呼叫方法傳回模型輸出。請注意,從呼叫方法傳回的任何張量都將跨模型平行裝置廣播,造成通訊開銷增加,因此不需傳回在呼叫方法之外的非必要張量 (例如中繼啟動)。 -
在
tf.Dataset.batch()
方法中設定drop_remainder=True
。這是為了確保批次大小必然可以被微批次數量整除。 -
使用
smp.dp_rank()
(如shuffle(ds, seed=smp.dp_rank())
) 在 Data Pipeline 中植入隨機操作,確保存放不同模型分割的 GPU 上的資料範例保持一致。 -
將轉送和向後邏輯放在 Step Function 中,並使用
smp.step
進行裝飾。 -
使用
StepOutput
方法 (如 reduce_mean
) 對微批次的輸出上執行後處理。smp.step
函式的傳回值必須取決於 smp.DistributedModel
的輸出。 -
如果有評估步驟,採取類似將轉送邏輯放在
smp.step
-裝飾函式內的作法,並使用StepOutput
API對輸出執行後處理。
要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔
下列 Python 指令碼是進行變更之後的訓練指令碼範例。
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
如果您已完成訓練指令碼的準備,請繼續執行 步驟 2:使用開發套件啟動訓練 Job SageMaker。如果想要執行混合模型和資料平行訓練任務,請繼續至下一節。
使用 TensorFlow 和 Horovod 進行自動化分割,適用於混合模型和資料平行處理
您可以將 SageMaker 模型平行程度程式庫與 Horovod 搭配使用,以進行混合模型和資料平行處理原則。若要閱讀更多有關程式庫針對混合式平行處理分割模型方式的資訊,請參閱管線平行度 (可用於 PyTorch 和 TensorFlow)。
在此步驟中,我們會著重於如何修改訓練指令碼,以調整 SageMaker模型平行程度程式庫。
為正確設定訓練指令碼,以取得您在 步驟 2:使用開發套件啟動訓練 Job SageMaker 要設定的混合式平行處理組態,請使用程式庫的輔助函式 smp.dp_rank()
和 smp.mp_rank()
,即會分別自動偵測資料平行和模型平行的排名。
要查找庫支持的所有 MPI 原語,請參閱 SageMaker Python SDK 文檔中的 MPI 基礎知識
指令碼中所需的必要變更如下:
-
新增
hvd.allreduce
-
根據 Horovod 的要求,在第一批次後廣播變數。
-
在
smp.dp_rank()
的 Data Pipeline 中植入隨機顯示和/或碎片操作。
注意
當您使用 Horovod 時,不得在訓練指令碼中直接呼叫 hvd.init
。相反地,您必須 SageMaker Python True
中modelparallel
將步驟 2:使用開發套件啟動訓練 Job SageMaker. "horovod"
這讓程式庫可以基於模型分割的裝置指派狀況,在內部初始化 Horovod。直接在訓練指令碼中呼叫 hvd.init()
,可能會造成問題。
注意
直接在訓練指令碼中使用 hvd.DistributedOptimizer
API,可能會導致訓練成效不佳與速度減緩,因為 API 背景作業下會將 AllReduce
操作置於 smp.step
中。我們建議您在取得 smp.step
傳回的漸層上呼叫 accumulate()
或 reduce_mean()
之後直接呼叫 hvd.allreduce
,以搭配 Horovod 使用模型平行處理程式庫,如下列範例所示。
要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
手動分割 TensorFlow
使用 smp.partition
內容管理器,將操作指派至特定分割中。未放置在任何 smp.partition
內容中的任何操作都會放置在 default_partition
。要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
不支援架構功能
資料庫不支援下列 TensorFlow 功能:
-
目前不支援
tf.GradientTape()
。您可以使用Optimizer.get_gradients()
或Optimizer.compute_gradients()
來運算漸層。 -
目前不支援
tf.train.Checkpoint.restore()
API。如為檢查點,請改為使用smp.CheckpointManager
提供相同的 API 和功能。請注意,smp.CheckpointManager
的檢查點還原應該在第一步驟後進行。