對具有模型並行性的模型執行檢查點和微調 - Amazon SageMaker

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

對具有模型並行性的模型執行檢查點和微調

SageMaker 模型平行程式庫提供檢查點 API,可儲存模型狀態和最佳化程式狀態 (依不同模型平行處理原則策略分割),並從您想要重新開始訓練和微調的位置載入連續訓練的檢查點。這些 API 還支援部分或全部儲存模型和最佳化工具狀態的選項。

對分散式模型進行檢查點

根據和和您使用的 SageMaker 模型平行程式庫版本 PyTorch 之 TensorFlow 間的架構,選擇下列其中一個主題。

檢查點分散式 PyTorch 模型 (適用於模 SageMaker 型平行程式庫 v1.10.0 及更新版本)

SageMaker 模型平行程式庫提供檢查點 API,以儲存和載入分散式模型狀態及其最佳化器狀態的完整或部分檢查點。

注意

如果您使用 PyTorch 且 SageMaker 模型平行程式庫 v1.10.0 或更新版本,建議使用此檢查點方法。

部分檢查點

若要儲存使用模型平行處理訓練的模型檢查點,請使用 smdistributed.modelparallel.torch.save_checkpointAPI,並將部分檢查點選項設定為 true (partial=True)。這會個別儲存每個模型分割區。除了模型和最佳化工具狀態之外,您還可以透過 user_content 引數儲存任何其他自訂資料。檢查點模型、最佳化工具和使用者內容都儲存為個別檔案。save_checkpoint API 呼叫會以下列結構建立檢查點資料夾。

- path - ${tag}_partial (folder for partial checkpoints) - model_rankinfo.pt - optimizer_rankinfo.pt - fp16_states_rankinfo.pt - user_content.pt - $tag (checkpoint file for full checkpoints) - user_content_$tag (user_content file for full checkpoints) - newest (a file that indicates the newest checkpoint)

若要從部分檢查點繼續訓練,請將 smdistributed.modelparallel.torch.resume_from_checkpoint API 與 partial=True 搭配使用,並指定檢查點目錄和儲存部分檢查點時使用的標籤。請注意,模型權重的實際載入發生在模型分割之後,在 smdistributed.modelparallel.torch.step 裝飾訓練 Step Function 初次執行期間。

儲存部分檢查點時,程式庫也會將模型分割區決定儲存為具有 .pt 副檔名的檔案。相反地,從部分檢查點恢復時,程式庫會將分割決定檔案載入在一起。一旦已載入分割區決定,就無法變更分割區。

下列程式碼片段顯示如何在 PyTorch訓練指令碼中設定檢查點 API。

import smdistributed.modelparallel.torch as smp model = ... model = smp.DistributedModel(model) optimizer = ... optimizer = smp.DistributedOptimizer(optimizer) user_content = ... # additional custom data checkpoint_path = "/opt/ml/checkpoint/model_parallel" # Save a checkpoint. smp.save_checkpoint( path=checkpoint_path, tag=f"total_steps{total_steps}", partial=True, model=model, optimizer=optimizer, user_content=user_content num_kept_partial_checkpoints=5 ) # Load a checkpoint. # This automatically loads the most recently saved checkpoint. smp_checkpoint = smp.resume_from_checkpoint( path=checkpoint_path, partial=True )

完整檢查點

若要儲存最終模型成品以供推論,請使用 smdistributed.modelparallel.torch.save_checkpoint API 搭配 partial=False,該 API 結合模型分割區以建立單一模型成品。請注意,這不會結合最佳化工具狀態。

若要初始化具有特定權重的訓練,只要提供一個完整模型檢查點,您可以使用 smdistributed.modelparallel.torch.resume_from_checkpoint API 搭配 partial=False。請注意,這不會載入最佳化工具狀態。

注意

一般而言,使用張量平行處理,state_dict 必須在原始模型實作和 DistributedModel 實作之間進行轉譯。或者,您可以為 smdistributed.modelparallel.torch.resume_from_checkpoint 提供 state_dict 轉譯函數作為引數。但是,對於 開箱即用的支援模型,程式庫會自動處理此轉譯。

下列程式碼會示範如何使用檢查點 API 來完整檢查點以 PyTorch 模型平行處理原則訓練的模型。

import smdistributed.modelparallel.torch as smp model = ... model = smp.DistributedModel(model) optimizer = ... optimizer = smp.DistributedOptimizer(optimizer) user_content = ... # additional custom data checkpoint_path = "/opt/ml/checkpoint/model_parallel" # Save a checkpoint. smp.save_checkpoint( path=checkpoint_path, tag=f"total_steps{total_steps}", partial=False, model=model, optimizer=optimizer, user_content=user_content num_kept_partial_checkpoints=5 ) # Load a checkpoint. # This automatically loads the most recently saved checkpoint. smp_checkpoint = smp.resume_from_checkpoint( path=checkpoint_path, partial=False )

檢查點分散式 PyTorch 模型 (適用於 v1.6.0 和 v1.9.0 之間的 SageMaker 模型平行程式庫)

SageMaker 模型平行程式庫提供 Python 函數,用於儲存具有張量平行性的訓練工作的部分或完整檢查點。下列程序顯示當您使用張量平行處理時,如何使用 smp.save()smp.load() 儲存及載入檢查點。

注意

如果您使用 PyTorch、和 v1.6.0 和 v1.9.0 之間的 SageMaker 模型平行處理原則程式庫張量平行處理,建議使用此檢查點方法。

  1. 準備模型物件,並使用程式庫包裝函數 smp.DistributedModel() 包裝。

    model = MyModel(...) model = smp.DistributedModel(model)
  2. 準備模型的最佳化工具。一組模型參數是最佳化工具函數所需的可迭代引數。若要準備一組模型參數,您必須處理 model.parameters() 以將不重複的 ID 指派給個別模型參數。

    如果模型參數可迭代中存在具有重複 ID 的參數,則載入檢查點最佳化工具狀態將失敗。若要為最佳化工具建立具有不重複的 ID 的模型參數可迭代,請參閱以下內容:

    unique_params = [] unique_params_set = set() for p in model.parameters(): if p not in unique_params_set: unique_params.append(p) unique_params_set.add(p) del unique_params_set optimizer = MyOpt(unique_params, ...)
  3. 使用程式庫的包裝函數 smp.DistributedOptimizer() 包裝最佳化工具。

    optimizer = smp.DistributedOptimizer(optimizer)
  4. 使用 smp.save() 儲存模型和最佳化工具狀態。根據您想要儲存檢查點的方式,選擇下列兩個選項其中之一:

    • 選項 1:為單一 MP_GROUP 在每個 mp_rank 上儲存部分模型。

      model_dict = model.local_state_dict() # save a partial model opt_dict = optimizer.local_state_dict() # save a partial optimizer state # Save the dictionaries at rdp_rank 0 as a checkpoint if smp.rdp_rank() == 0: smp.save( {"model_state_dict": model_dict, "optimizer_state_dict": opt_dict}, f"/checkpoint.pt", partial=True, )

      透過張量平行處理,程式庫會儲存以下列格式命名的檢查點檔案:checkpoint.pt_{pp_rank}_{tp_rank}

      注意

      使用張量平行處理,確保將 if 陳述式設定為 if smp.rdp_rank() == 0 而不是 if smp.dp_rank() == 0。當最佳化工具狀態以張量平行處理進行分割時,所有縮減的資料平行排名都必須儲存自己的最佳化工具狀態分割。使用錯誤的 if 陳述式執行檢查點,可能導致訓練任務停滯。如需有關if smp.dp_rank() == 0不使用張量平行處理原則的詳細資訊,請參閱 SageMaker Python SDK 文件中的儲存和載入一般指示

    • 選項 2:儲存完整模型。

      if smp.rdp_rank() == 0: model_dict = model.state_dict(gather_to_rank0=True) # save the full model if smp.rank() == 0: smp.save( {"model_state_dict": model_dict}, "/checkpoint.pt", partial=False, )
      注意

      對於完整檢查點,請考量以下事項:

      • 如果設定 gather_to_rank0=True,除了 0 以外的所有排名會傳回空白字典。

      • 對於完整檢查點,您只能對模型執行檢查點。目前不支援最佳化工具狀態的完整檢查點。

      • 完整模型只需要儲存於 smp.rank() == 0

  5. 使用 smp.load() 載入檢查點。根據上一步中執行檢查點的方法,選擇下列兩個選項其中之一:

    • 選項 1:載入部分檢查點。

      checkpoint = smp.load("/checkpoint.pt", partial=True) model.load_state_dict(checkpoint["model_state_dict"], same_partition_load=False) optimizer.load_state_dict(checkpoint["optimizer_state_dict"])

      如果您知道分割區不會變更,則可以設定 model.load_state_dict() 中的 same_partition_load=True 以取得更快的載入。

    • 選項 2:載入完整檢查點。

      if smp.rdp_rank() == 0: checkpoint = smp.load("/checkpoint.pt", partial=False) model.load_state_dict(checkpoint["model_state_dict"])

      if smp.rdp_rank() == 0 條件為非必要,但它可以協助避免不同 MP_GROUP 之間的多餘載入。張量平行處理目前不支援完整檢查點最佳化工具狀態字典。

檢查點分散 TensorFlow 式模型

若要在使用 TensorFlow 模型平行度訓練時儲存模型,請使用模 SageMaker 型平行程度庫提供的下列功能。

微調分散式模型

微調需要在訓練指令碼中設定。下列程式碼片段顯示使用 AutoModelForCausalLM 類別 Hugging Face 變壓器的訓練指令碼的範例結構,其中包含註冊smdistributed.model.parallel.torch模組和設定以進行微調的修改。

注意

使用已啟動的 smp.delayed_param_initialization 功能對分散式轉換器 (由 smp.DistributedModel() 包裝的轉換器模型) 進行微調,需要使用 FSx for Lustre 檔案系統來設定微調任務。如果您想要使用已延遲的參數初始化選項微調大規模模型,則應設定 FSx for Lustre 檔案系統。

import argparse from transformers import AutoModelForCausalLM import smdistributed.modelparallel import smdistributed.modelparallel.torch as smp def parse_args(): parser = argparse.ArgumentParser() # set an arg group for model model_grp = parser.add_argument_group( title="model", description="arguments to describe model configuration" ) ... # set up numerous args to parse from the configuration dictionary to the script for training # add arg for activating fine-tuning model_grp.add_argument( "--fine_tune", type=int, default=0, help="Fine-tune model from checkpoint or pretrained model", ) def main(): """Main function to train GPT.""" args = parse_args() ... # parse numerous args if args.fine_tune > 0 and args.delayed_param > 0 and smp.rank() == 0: pretrained_model = AutoModelForCausalLM.from_pretrained( args.model_name or args.model_dir ) model_state_dict = pretrained_model.state_dict() path = os.path.join(args.model_dir, "fullmodel.pt") torch.save(model_state_dict, path) # create a Transformer model and wrap by smp.model_creation() # with options to configure model parallelism parameters offered by SageMaker with smp.model_creation( tensor_parallelism=smp.tp_size() > 1 or args.use_distributed_transformer > 0, zero_init=args.use_distributed_transformer == 0, dtype=dtype, distribute_embedding=args.sharded_data_parallel_degree > 1 and smp.tp_size() > 1, use_alibi=args.alibi > 0, attention_in_fp32=args.attention_in_fp32 > 0, fp32_residual_addition=args.residual_addition_in_fp32 > 0, query_key_layer_scaling=args.query_key_layer_scaling > 0 and args.bf16 < 1, fused_softmax=args.fused_softmax > 0, fused_dropout=args.fused_dropout > 0, fused_bias_gelu=args.fused_bias_gelu > 0, flash_attention=args.flash_attention > 0, ): if args.fine_tune > 0 and args.delayed_param == 0: model = AutoModelForCausalLM.from_pretrained( args.model_name or args.model_dir ) else: model = AutoModelForCausalLM.from_config(model_config) # wrap the model by smp.DistributedModel() to apply SageMaker model parallelism model = smp.DistributedModel( model, trace_device="gpu", backward_passes_per_step=args.gradient_accumulation ) # wrap the optimizer by smp.DistributedOptimizer() to apply SageMaker model parallelism optimizer= ... # define an optimizer optimizer = smp.DistributedOptimizer( optimizer, static_loss_scale=None, dynamic_loss_scale=True, dynamic_loss_args={"scale_window": 1000, "min_scale": 1, "delayed_shift": 2}, ) # for fine-tuning, use smp.resume_from_checkpoint() to load a pre-trained model if args.fine_tune > 0 and args.delayed_param > 0: smp.resume_from_checkpoint(args.model_dir, tag="fullmodel.pt", partial=False)

如需訓練指令碼和 Jupyter 筆記本的完整範例,請參閱範例儲存庫 PyTorch中的 GPT-2 SageMaker 範例 GitHub