Überprüfung und Feinabstimmung eines Modells mit Modellparallelität - Amazon SageMaker

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Überprüfung und Feinabstimmung eines Modells mit Modellparallelität

Die SageMaker Modellparallelitätsbibliothek bietet Checkpointing-APIs, um den Modellstatus und den Optimiererstatus nach den verschiedenen Modellparallelitätsstrategien zu speichern und Checkpoints für kontinuierliches Training zu laden, von denen aus Sie das Training und die Feinabstimmung neu starten möchten. Die APIs unterstützen auch Optionen zum teilweisen oder vollständigen Speichern des Modells und des Optimiererstatus.

Überprüfung eines verteilten Modells

Wählen Sie je nach Framework zwischen PyTorch und TensorFlow und Version der SageMaker Modellparallelitätsbibliothek, die Sie verwenden, eines der folgenden Themen aus.

Checkpointing eines verteilten PyTorch Modells (für die SageMaker Modellparallelitätsbibliothek v1.10.0 und höher)

Die SageMaker Modellparallelitätsbibliothek bietet Checkpoint-APIs zum Speichern und Laden vollständiger oder teilweiser Checkpoints des verteilten Modellstatus und seines Optimiererstatus.

Anmerkung

Diese Checkpointing-Methode wird empfohlen, wenn Sie PyTorch und die SageMaker Modellparallelitätsbibliothek v1.10.0 oder höher verwenden.

Teilweises Checkpointing

Um Checkpoints eines mit Modellparallelität trainierten Modells zu speichern, verwenden Sie die smdistributed.modelparallel.torch.save_checkpointAPI, wobei die Option für partielles Checkpointing auf true (partial=True) gesetzt ist. Dadurch wird jede Modellpartition einzeln gespeichert. Neben dem Modell und dem Status des Optimierers können Sie mit dem user_content Argument auch alle zusätzlichen benutzerdefinierten Daten speichern. Das Prüfpunktmodell, der Optimierer und der Benutzerinhalt werden als separate Dateien gespeichert. Der save_checkpoint API-Aufruf erstellt Checkpoint-Ordner in der folgenden Struktur.

- path - ${tag}_partial (folder for partial checkpoints) - model_rankinfo.pt - optimizer_rankinfo.pt - fp16_states_rankinfo.pt - user_content.pt - $tag (checkpoint file for full checkpoints) - user_content_$tag (user_content file for full checkpoints) - newest (a file that indicates the newest checkpoint)

Um das Training von partiellen Checkpoints aus fortzusetzen, verwenden Sie die smdistributed.modelparallel.torch.resume_from_checkpoint-API mit partial=True und geben Sie das Checkpoint-Verzeichnis und das Tag an, das beim Speichern der partiellen Checkpoints verwendet wurde. Beachten Sie, dass das tatsächliche Laden der Modellgewichte nach der Modellpartitionierung erfolgt, also während des ersten Durchlaufs der Trainingsschrittfunktion mit smdistributed.modelparallel.torch.step -dekoriertem Dekor.

Beim Speichern eines partiellen Checkpoints speichert die Bibliothek auch die Entscheidung für die Modellpartition als Datei mit der .pt Dateierweiterung. Umgekehrt lädt die Bibliothek die Partitionsentscheidungsdateien zusammen, wenn der Vorgang vom partiellen Checkpoint aus fortgesetzt wird. Sobald die Partitionsentscheidung geladen ist, können Sie die Partition nicht mehr ändern.

Der folgende Codeausschnitt zeigt, wie Sie die Checkpoint-APIs in einem PyTorch Trainingsskript festlegen.

import smdistributed.modelparallel.torch as smp model = ... model = smp.DistributedModel(model) optimizer = ... optimizer = smp.DistributedOptimizer(optimizer) user_content = ... # additional custom data checkpoint_path = "/opt/ml/checkpoint/model_parallel" # Save a checkpoint. smp.save_checkpoint( path=checkpoint_path, tag=f"total_steps{total_steps}", partial=True, model=model, optimizer=optimizer, user_content=user_content num_kept_partial_checkpoints=5 ) # Load a checkpoint. # This automatically loads the most recently saved checkpoint. smp_checkpoint = smp.resume_from_checkpoint( path=checkpoint_path, partial=True )

Vollständiges Checkpointing

Um das endgültige Modellartefakt für Inferenzzwecke zu speichern, verwenden Sie die smdistributed.modelparallel.torch.save_checkpoint API mit partial=False, die die Modellpartitionen zu einem einzigen Modellartefakt kombiniert. Beachten Sie, dass die Zustände des Optimierers dabei nicht kombiniert werden.

Um das Training mit bestimmten Gewichten zu initialisieren, können Sie bei einem vollständigen Modell-Checkpoint die smdistributed.modelparallel.torch.resume_from_checkpoint API mit partial=False verwenden. Beachten Sie, dass dadurch keine Optimizer-Status geladen werden.

Anmerkung

Bei der Tensor-Parallelität muss das state_dict im Allgemeinen zwischen der ursprünglichen Modellimplementierung und der DistributedModel-Implementierung übersetzt werden. Optional können Sie die state_dict Übersetzungsfunktion als Argument für smdistributed.modelparallel.torch.resume_from_checkpoint angeben. Für Ab Werk unterstützte Modelle kümmert sich die Bibliothek jedoch automatisch um diese Übersetzung.

Der folgende Code zeigt ein Beispiel für die Verwendung der Checkpoint-APIs für das vollständige Checkpointing eines PyTorch Modells, das mit Modellparallelität trainiert wurde.

import smdistributed.modelparallel.torch as smp model = ... model = smp.DistributedModel(model) optimizer = ... optimizer = smp.DistributedOptimizer(optimizer) user_content = ... # additional custom data checkpoint_path = "/opt/ml/checkpoint/model_parallel" # Save a checkpoint. smp.save_checkpoint( path=checkpoint_path, tag=f"total_steps{total_steps}", partial=False, model=model, optimizer=optimizer, user_content=user_content num_kept_partial_checkpoints=5 ) # Load a checkpoint. # This automatically loads the most recently saved checkpoint. smp_checkpoint = smp.resume_from_checkpoint( path=checkpoint_path, partial=False )

Checkpointing eines verteilten PyTorch Modells (für die SageMaker Modellparallelitätsbibliothek zwischen v1.6.0 und v1.9.0)

Die SageMaker Modellparallelitätsbibliothek bietet Python-Funktionen zum Speichern teilweiser oder vollständiger Checkpoints für Trainingsaufträge mit Tensorparallelität. Das folgende Verfahren zeigt, wie Sie smp.save() und smp.load() verwenden, um einen Prüfpunkt zu speichern und zu laden, wenn Sie Tensor-Parallelität verwenden.

Anmerkung

Diese Checkpointing-Methode wird empfohlen PyTorch, wenn Sie Tensor-Parallelität, und die SageMaker Modellparallelitätsbibliothek zwischen v1.6.0 und v1.9.0 verwenden.

  1. Bereiten Sie ein Modellobjekt vor und umschließen Sie es mit der Wrapper-Funktion smp.DistributedModel() der Bibliothek.

    model = MyModel(...) model = smp.DistributedModel(model)
  2. Bereiten Sie einen Optimierer für das Modell vor. Ein Satz von Modellparametern ist ein iterierbares Argument, das von Optimiererfunktionen benötigt wird. Um einen Satz von Modellparametern vorzubereiten, müssen Sie model.parameters() verarbeiten, um den einzelnen Modellparametern eindeutige IDs zuzuweisen.

    Wenn der iterierbare Modellparameter Parameter mit doppelten IDs enthält, schlägt das Laden des Optimizerstatus mit Checkpoints fehl. Um eine iterierbare Anzahl von Modellparametern mit eindeutigen IDs für Ihren Optimierer zu erstellen, gehen Sie wie folgt vor:

    unique_params = [] unique_params_set = set() for p in model.parameters(): if p not in unique_params_set: unique_params.append(p) unique_params_set.add(p) del unique_params_set optimizer = MyOpt(unique_params, ...)
  3. Wickeln Sie den Optimizer mithilfe der Wrapper-Funktion smp.DistributedOptimizer() der Bibliothek ein.

    optimizer = smp.DistributedOptimizer(optimizer)
  4. Speichern Sie das Modell und den Status des Optimierers mit smp.save(). Wählen Sie eine der folgenden beiden Optionen aus, abhängig davon, wie Überprüfung gespeichert werden soll:

    • Option 1: Speichern Sie ein Teilmodell auf jedem mp_rank für ein einzelnes MP_GROUP.

      model_dict = model.local_state_dict() # save a partial model opt_dict = optimizer.local_state_dict() # save a partial optimizer state # Save the dictionaries at rdp_rank 0 as a checkpoint if smp.rdp_rank() == 0: smp.save( {"model_state_dict": model_dict, "optimizer_state_dict": opt_dict}, f"/checkpoint.pt", partial=True, )

      Bei der Tensorparallelität speichert die Bibliothek Dateien mit Prüfpunkten, die im folgenden Format benannt sind: checkpoint.pt_{pp_rank}_{tp_rank}.

      Anmerkung

      Stellen Sie bei der Tensorparallelität sicher, dass Sie die if-Anweisung auf if smp.rdp_rank() == 0 statt auf if smp.dp_rank() == 0 setzen. Wenn der Optimiererstatus mit Tensorparallelität geteilt wird, müssen alle parallelen Ränge mit reduzierten Daten ihre eigene Partition des Optimiererstatus speichern. Die Verwendung einer falschen if -Anweisungfür Checkpoints kann dazu führen, dass der Trainingsjob ins Stocken gerät. Weitere Informationen zur Verwendung von if smp.dp_rank() == 0 ohne Tensorparallelität finden Sie unter Allgemeine Anweisungen zum Speichern und Laden in der SageMaker Python-SDK-Dokumentation.

    • Option 2: Speichern Sie das vollständige Modell.

      if smp.rdp_rank() == 0: model_dict = model.state_dict(gather_to_rank0=True) # save the full model if smp.rank() == 0: smp.save( {"model_state_dict": model_dict}, "/checkpoint.pt", partial=False, )
      Anmerkung

      Beachten Sie für ein vollständiges Checkpointing Folgendes:

      • Wenn Sie gather_to_rank0=True einstellen, geben alle anderen Ränge als 0 leere Wörterbücher zurück.

      • Für ein vollständiges Checkpointing können Sie nur das Modell mit Checkpoints versehen. Ein vollständiges Checkpointing von Optimizer-Status wird derzeit nicht unterstützt.

      • Das vollständige Modell muss nur unter smp.rank() == 0 gespeichert werden.

  5. Laden Sie die Checkpoints mit smp.load(). Wählen Sie, abhängig von der Überprüfung im vorherigen Schritt, eine der folgenden beiden Optionen aus:

    • Option 1: Laden Sie die partiellen Checkpoints.

      checkpoint = smp.load("/checkpoint.pt", partial=True) model.load_state_dict(checkpoint["model_state_dict"], same_partition_load=False) optimizer.load_state_dict(checkpoint["optimizer_state_dict"])

      Sie können same_partition_load=True auf model.load_state_dict() für ein schnelleres Laden einstellen, wenn Sie wissen, dass sich die Partition nicht ändert.

    • Option 2: Lädt die vollständigen Checkpoints.

      if smp.rdp_rank() == 0: checkpoint = smp.load("/checkpoint.pt", partial=False) model.load_state_dict(checkpoint["model_state_dict"])

      Die if smp.rdp_rank() == 0 Bedingung ist nicht erforderlich, kann aber dazu beitragen, redundantes Laden zwischen verschiedenen MP_GROUPs zu vermeiden. Das vollständige State-Diktat des Checkpointing-Optimizers wird derzeit bei der Tensorparallelität nicht unterstützt.

Checkpointing eines verteilten TensorFlow Modells

Um ein TensorFlow Modell beim Training mit Modellparallelität zu speichern, verwenden Sie die folgenden Funktionen, die von der SageMaker Modellparallelitätsbibliothek bereitgestellt werden.

Feinabstimmung eines verteilten Modells

Die Feinabstimmung muss in Ihrem Trainingsskript konfiguriert werden. Der folgende Codeausschnitt zeigt eine Beispielstruktur eines Trainingsskripts, das die AutoModelForCausalLM-Klasse von Hugging Face Transformers mit Änderungen zur Registrierung der smdistributed.model.parallel.torch Module und Einstellungen für die Feinabstimmung verwendet.

Anmerkung

Für die Feinabstimmung eines verteilten Transformers (ein Transformer-Modell von smp.DistributedModel() eingeschlossen) mit aktivierter Funktion smp.delayed_param_initialization muss der Feinabstimmungsjob mit einem FSx for Lustre-Dateisystem konfiguriert werden. In Fällen, in denen Sie ein umfangreiches Modell mit der Option zur verzögerten Parameterinitialisierung optimieren möchten, sollten Sie ein FSx for Lustre-Dateisystem einrichten.

import argparse from transformers import AutoModelForCausalLM import smdistributed.modelparallel import smdistributed.modelparallel.torch as smp def parse_args(): parser = argparse.ArgumentParser() # set an arg group for model model_grp = parser.add_argument_group( title="model", description="arguments to describe model configuration" ) ... # set up numerous args to parse from the configuration dictionary to the script for training # add arg for activating fine-tuning model_grp.add_argument( "--fine_tune", type=int, default=0, help="Fine-tune model from checkpoint or pretrained model", ) def main(): """Main function to train GPT.""" args = parse_args() ... # parse numerous args if args.fine_tune > 0 and args.delayed_param > 0 and smp.rank() == 0: pretrained_model = AutoModelForCausalLM.from_pretrained( args.model_name or args.model_dir ) model_state_dict = pretrained_model.state_dict() path = os.path.join(args.model_dir, "fullmodel.pt") torch.save(model_state_dict, path) # create a Transformer model and wrap by smp.model_creation() # with options to configure model parallelism parameters offered by SageMaker with smp.model_creation( tensor_parallelism=smp.tp_size() > 1 or args.use_distributed_transformer > 0, zero_init=args.use_distributed_transformer == 0, dtype=dtype, distribute_embedding=args.sharded_data_parallel_degree > 1 and smp.tp_size() > 1, use_alibi=args.alibi > 0, attention_in_fp32=args.attention_in_fp32 > 0, fp32_residual_addition=args.residual_addition_in_fp32 > 0, query_key_layer_scaling=args.query_key_layer_scaling > 0 and args.bf16 < 1, fused_softmax=args.fused_softmax > 0, fused_dropout=args.fused_dropout > 0, fused_bias_gelu=args.fused_bias_gelu > 0, flash_attention=args.flash_attention > 0, ): if args.fine_tune > 0 and args.delayed_param == 0: model = AutoModelForCausalLM.from_pretrained( args.model_name or args.model_dir ) else: model = AutoModelForCausalLM.from_config(model_config) # wrap the model by smp.DistributedModel() to apply SageMaker model parallelism model = smp.DistributedModel( model, trace_device="gpu", backward_passes_per_step=args.gradient_accumulation ) # wrap the optimizer by smp.DistributedOptimizer() to apply SageMaker model parallelism optimizer= ... # define an optimizer optimizer = smp.DistributedOptimizer( optimizer, static_loss_scale=None, dynamic_loss_scale=True, dynamic_loss_args={"scale_window": 1000, "min_scale": 1, "delayed_shift": 2}, ) # for fine-tuning, use smp.resume_from_checkpoint() to load a pre-trained model if args.fine_tune > 0 and args.delayed_param > 0: smp.resume_from_checkpoint(args.model_dir, tag="fullmodel.pt", partial=False)

Ein vollständiges Beispiel für Trainingsskripte und Jupyter-Notebooks finden Sie in den GPT-2-Beispielen für PyTorch im SageMaker Beispiel GitHub-Repository .