학습 스크립트 수정 TensorFlow - Amazon SageMaker

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

학습 스크립트 수정 TensorFlow

이 섹션에서는 TensorFlow 교육 스크립트를 수정하여 자동 파티셔닝과 수동 파티셔닝을 위한 SageMaker 모델 병렬화 라이브러리를 구성하는 방법을 알아봅니다. 이 예제 모음에는 하이브리드 모델 및 데이터 병렬화를 위해 Horovod와 통합된 예제도 포함되어 있습니다.

참고

라이브러리에서 지원하는 TensorFlow 버전을 찾으려면 을 참조하십시오. 지원되는 프레임워크 및 AWS 리전

라이브러리를 사용하기 위해 훈련 스크립트를 수정해야 하는 필수 사항은 를 사용한 자동 분할 TensorFlow에 나열되어 있습니다.

Horovod에서 하이브리드 모델 및 데이터 병렬화를 사용하도록 훈련 스크립트를 수정하는 방법을 알아보려면 하이브리드 모델 TensorFlow 및 데이터 병렬화를 위한 Horovod를 사용한 자동 분할을/를 참조하세요.

수동 파티셔닝을 사용하려는 경우에도 수동 분할은 다음과 같습니다. TensorFlow 을/를 검토하세요.

다음 항목에서는 자동 파티셔닝 및 수동 파티셔닝 모델을 위한 모델 병렬화 라이브러리를 구성하는 SageMaker 데 사용할 수 있는 교육 스크립트의 예를 보여줍니다. TensorFlow

참고

자동 파티셔닝은 기본적으로 활성화되어 있습니다. 달리 지정하지 않는 한, 예제 스크립트는 자동 파티셔닝을 사용합니다.

를 사용한 자동 분할 TensorFlow

의 모델 병렬화 라이브러리로 TensorFlow SageMaker 모델을 실행하려면 다음과 같은 교육 스크립트 변경이 필요합니다.

  1. smp.init()을 사용하여 라이브러리를 가져오고 초기화합니다.

  2. Keras 모델 클래스 smp.DistributedModel에서 상속하여 Keras 모델을 정의합니다. smp.DistributedModel 객체의 호출 메서드에서 모델 출력을 반환합니다. 호출 메서드에서 반환되는 모든 텐서는 모델 병렬 장치 간에 브로드캐스트되므로 통신 오버헤드가 발생하므로 호출 메서드 외부에서 필요하지 않은 텐서(예: 중간 활성화)는 반환되지 않아야 합니다.

  3. tf.Dataset.batch() 메서드에서 drop_remainder=True로 설정합니다. 이는 배치 크기를 항상 마이크로배치 수로 나눌 수 있도록 하기 위한 것입니다.

  4. smp.dp_rank()를 사용하여 데이터 파이프라인에서 무작위 작업을 시드(Seed) 하세요. 예를 들어 서로 다른 모델 파티션을 포함하는 GPU 간의 데이터 샘플의 일관성을 보장하기 위한 shuffle(ds, seed=smp.dp_rank())를 시드합니다.

  5. 순방향 및 역방향 로직을 Step Function에 넣고 이를 smp.step로 데코레이트하세요.

  6. reduce_mean과 같은 StepOutput 메서드를 사용하여 마이크로배치의 출력값에 대해 후처리를 수행합니다. smp.step 함수는 smp.DistributedModel의 출력값에 따라 달라지는 반환 값이 있어야 합니다.

  7. 평가 단계가 있는 경우에도 마찬가지로 smp.step로 데코레이팅된 함수 안에 순방향 로직을 배치하고 StepOutput API를 사용하여 출력값을 후처리하세요.

SageMaker의 모델 병렬화 라이브러리 API에 대한 자세한 내용은 API 설명서를 참조하십시오.

다음 Python 스크립트는 변경 후의 훈련 스크립트의 예입니다.

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

훈련 스크립트 준비를 마쳤으면 2단계: SageMaker Python SDK를 사용하여 교육 작업 시작으로 진행하세요. 하이브리드 모델 및 데이터 병렬화 훈련 작업을 실행하려면 다음 섹션으로 계속합니다.

하이브리드 모델 TensorFlow 및 데이터 병렬화를 위한 Horovod를 사용한 자동 분할

Horovod와 함께 모델 병렬화 라이브러리를 SageMaker 하이브리드 모델 및 데이터 병렬화에 사용할 수 있습니다. 라이브러리가 하이브리드 병렬화를 위해 모델을 분할하는 방법에 대한 자세한 내용은 PyTorch TensorFlow파이프라인 병렬화 (및 에 사용 가능)을/를 참조하세요.

이 단계에서는 모델 병렬 처리 라이브러리에 맞게 교육 스크립트를 수정하는 방법을 중점적으로 다룹니다. SageMaker

2단계: SageMaker Python SDK를 사용하여 교육 작업 시작에서 설정할 하이브리드 병렬화 구성을 선택하도록 훈련 스크립트를 올바르게 설정하려면 데이터 병렬 순위와 모델 병렬 순위를 각각 자동으로 감지하는 라이브러리의 도우미 함수 smp.dp_rank()smp.mp_rank()를 사용하세요.

라이브러리가 지원하는 모든 MPI 프리미티브를 찾으려면 Python SageMaker SDK 설명서의 MPI 기본을 참조하십시오.

스크립트에 필요한 변경 사항은 다음과 같습니다.

  • hvd.allreduce 추가

  • Horovod에서 요구하는 대로 첫 번째 배치 이후에 변수를 브로드캐스팅합니다

  • smp.dp_rank()를 사용하여 데이터 파이프라인의 셔플링 및/또는 샤딩 작업을 시드(seed) 합니다.

참고

Horovod를 사용할 때는 훈련 스크립트에서 hvd.init을 직접 호출해서는 안 됩니다. 대신 의 SageMaker Python SDK True modelparallel 매개변수에서 "horovod" 2단계: SageMaker Python SDK를 사용하여 교육 작업 시작 로 설정해야 합니다. 이렇게 하면 라이브러리가 모델 파티션의 디바이스 할당을 기반으로 Horovod를 내부적으로 초기화할 수 있습니다. 훈련 스크립트에서 hvd.init()을 직접 호출하면 문제가 발생할 수 있습니다.

참고

훈련 스크립트에서 직접 hvd.DistributedOptimizer API를 사용하면 API가 AllReduce 작업을 암시적으로 smp.step 내부에 배치하기 때문에 훈련 성능과 속도가 저하될 수 있습니다. 다음 예와 같이 smp.step에서 반환된 그래디언트에 따라 accumulate() 또는 reduce_mean()을 호출한 후 hvd.allreduce를 직접 호출하여 Horovod와 함께 모델 병렬화 라이브러리를 사용하는 것이 좋습니다.

SageMaker의 모델 병렬화 라이브러리 API에 대한 자세한 내용은 API 설명서를 참조하십시오.

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

수동 분할은 다음과 같습니다. TensorFlow

smp.partition 컨텍스트 관리자를 사용하여 작업을 특정 파티션에 배치합니다. smp.partition 컨텍스트에 배치되지 않은 모든 작업은 default_partition에 배치됩니다. SageMaker의 모델 병렬화 라이브러리 API에 대한 자세한 내용은 API 설명서를 참조하십시오.

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

지원되지 않는 프레임워크 기능

다음 TensorFlow 기능은 라이브러리에서 지원되지 않습니다.

  • 현재 tf.GradientTape()은 지원되지 않습니다. 대신 Optimizer.get_gradients() 또는 Optimizer.compute_gradients()를 사용하여 그래디언트를 계산할 수 있습니다.

  • 현재 tf.train.Checkpoint.restore() API는 지원되지 않습니다. 체크포인팅의 경우 대신 smp.CheckpointManager를 사용합니다. 이는 동일한 API와 기능을 제공합니다. smp.CheckpointManager을 이용한 체크포인트 복원은 첫 번째 단계 이후에 이루어져야 한다는 점에 유의하세요.