在 TensorFlow 训练脚本中使用 SMDDP 库(已弃用) - 亚马逊 SageMaker AI

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 TensorFlow 训练脚本中使用 SMDDP 库(已弃用)

重要

在 v2.11.0 之后,SMDDP 库已停止支持, TensorFlow 并且在 DLCs v2.11.0 TensorFlow 之后不再可用。要查找以前安装了 TensorFlow DLCs SMDDP 库的情况,请参阅。支持的框架

以下步骤向您展示如何修改 TensorFlow 训练脚本以利用 SageMaker AI 的分布式数据 parallel 库。 

该库 APIs 的设计类似于 Horovod APIs。有关该库提供的每个 API 的更多详细信息 TensorFlow,请参阅 SageMaker AI 分布式数据 parallel TensorFlow API 文档

注意

SageMaker AI 分布式数据 parallel 适用于由除tf.keras模块之外的tf核心模块组成的 TensorFlow 训练脚本。 SageMaker 人工智能分布式数据 parallel 不支持 Ker TensorFlow as 实现。

注意

SageMaker AI 分布式数据并行度库支持开箱即用的自动混合精度 (AMP)。除了对训练脚本进行框架级别的修改之外,您无需执行额外操作即可启用 AMP。如果有梯度 FP16,则 SageMaker AI 数据并行度库将在中运行其操作。AllReduce FP16有关在训练脚本中实现 AMP APIs 的更多信息,请参阅以下资源:

  1. 导入库的 TensorFlow 客户端并对其进行初始化。

    import smdistributed.dataparallel.tensorflow as sdp  sdp.init()
  2. 使用 local_rank 将每个 GPU 固定到一个 smdistributed.dataparallel 进程,这表示进程在给定节点中的相对秩。sdp.tensorflow.local_rank() API 向您提供设备的局部秩。领导节点的秩为 0,Worker 节点的秩为 1、2、3,依此类推。在以下代码块中将其调用为sdp.local_rank()set_memory_growth与分布式 SageMaker AI 没有直接关系,但必须将其设置为使用进行分布式训练 TensorFlow。

    gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
  3. 根据工作线程数扩展学习率。sdp.tensorflow.size() API 为您提供集群中工作线程的数量。以下代码块中将其作为 sdp.size() 调用。

    learning_rate = learning_rate * sdp.size()
  4. 在训练期间,使用库的 DistributedGradientTape 来优化 AllReduce 操作。这会包装 tf.GradientTape。 

    with tf.GradientTape() as tape:       output = model(input)       loss_value = loss(label, output)      # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape)
  5. 将初始模型变量从领导节点(秩 0)广播到所有 Worker 节点(秩 1 到 n)。这是确保对所有工作线程秩进行一致的初始化所必需的。在模型和优化器变量初始化后使用 sdp.tensorflow.broadcast_variables API。以下代码块中将其作为 sdp.broadcast_variables() 调用。

    sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0)
  6. 最后,修改脚本,仅在领导节点上保存检查点。领导节点具有同步模型。这还可以避免 Worker 节点覆盖检查点并可能损坏检查点。

    if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

以下是使用库进行分布式 TensorFlow 训练的示例训练脚本。

import tensorflow as tf # SageMaker AI data parallel: Import the library TF API import smdistributed.dataparallel.tensorflow as sdp # SageMaker AI data parallel: Initialize the library sdp.init() gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     # SageMaker AI data parallel: Pin GPUs to a single library process     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') # Prepare Dataset dataset = tf.data.Dataset.from_tensor_slices(...) # Define Model mnist_model = tf.keras.Sequential(...) loss = tf.losses.SparseCategoricalCrossentropy() # SageMaker AI data parallel: Scale Learning Rate # LR for 8 node run : 0.000125 # LR for single node run : 0.001 opt = tf.optimizers.Adam(0.000125 * sdp.size()) @tf.function def training_step(images, labels, first_batch):     with tf.GradientTape() as tape:         probs = mnist_model(images, training=True)         loss_value = loss(labels, probs)     # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape     tape = sdp.DistributedGradientTape(tape)     grads = tape.gradient(loss_value, mnist_model.trainable_variables)     opt.apply_gradients(zip(grads, mnist_model.trainable_variables))     if first_batch:        # SageMaker AI data parallel: Broadcast model and optimizer variables        sdp.broadcast_variables(mnist_model.variables, root_rank=0)        sdp.broadcast_variables(opt.variables(), root_rank=0)     return loss_value ... # SageMaker AI data parallel: Save checkpoints only from master node. if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

调整完训练脚本后,请继续到使用 Python SageMaker SDK 使用 SMDDP 启动分布式训练作业