원격 함수 호출 - Amazon SageMaker

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

원격 함수 호출

@remote 데코레이터 내에서 함수를 호출하려면 다음 방법 중 하나를 사용하세요.

@remote 데코레이터 메서드를 사용하여 함수를 호출하는 경우 훈련 작업은 함수가 완료될 때까지 기다렸다가 새 작업을 시작합니다. 그러나 RemoteExecutor 를 사용하는 경우 둘 이상의 작업을 병렬로 실행할 API수 있습니다. 다음 섹션에서는 함수를 호출하는 두 가지 방법을 모두 보여줍니다.

@remote 데코레이터를 사용하여 함수를 호출

@remote 데코레이터를 사용하여 함수에 주석을 달 수 있습니다. SageMaker 는 데코레이터 내부의 코드를 SageMaker 훈련 작업으로 변환합니다. 그러면 훈련 작업은 데코레이터 내에서 함수를 호출하고 작업이 완료될 때까지 기다립니다. 다음 코드 예제에서는 필수 라이브러리를 가져오고, 인스턴스를 SageMaker 시작하고, @remote 데코레이터로 매트릭스 곱셈에 주석을 지정하는 방법을 보여줍니다.

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

데코레이터는 다음과 같이 정의됩니다.

def remote( *, **kwarg): ...

지정된 함수를 호출하면 SageMaker Python은 오류로 인해 발생한 모든 예외를 로컬 메모리에 SDK 로드합니다. 다음 코드 예제에서는 divide 함수에 대한 첫 번째 호출이 성공적으로 완료되고 그 결과가 로컬 메모리로 로드됩니다. divide 함수를 두 번째로 호출하면 코드가 오류를 반환하고 이 오류가 로컬 메모리로 로드됩니다.

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
참고

데코레이팅된 함수는 원격 작업으로 실행됩니다. 스레드가 중단되더라도 기본 작업은 중지되지 않습니다.

로컬 변수 값을 변경하는 방법

데코레이터 함수는 원격 머신에서 실행됩니다. 데코레이팅된 함수 내에서 로컬이 아닌 변수 또는 입력 인수를 변경해도 로컬 값은 변경되지 않습니다.

다음 코드 예제에서는 데코레이터 함수 내에 목록과 딕셔너리가 추가됩니다. 이는 데코레이터 함수가 호출될 때 변경되지 않습니다.

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

데코레이터 함수 내에서 선언된 로컬 변수 값을 변경하려면 함수에서 변수를 반환하세요. 다음 코드 예제는 함수에서 반환되는 로컬 변수 값이 변경되는 것을 보여줍니다.

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

데이터 직렬화 및 역직렬화

원격 함수를 호출하면 는 입력 및 출력 단계에서 함수 인수를 SageMaker 자동으로 직렬화합니다. 함수 인수 및 반환은 cloudpickle을 사용하여 직렬화됩니다. 는 다음 Python 객체 및 함수의 직렬화를 SageMaker 지원합니다.

  • 딕셔너리, 리스트, 부동 소수, 정수, 문자열, 부울 값, 튜플 등 내장된 Python 객체

  • Numpy 배열

  • Pandas Dataframe

  • Scikit-learn 데이터 세트 및 예측기

  • PyTorch 모델

  • TensorFlow 모델

  • 에 대한 부스터 클래스 XGBoost

다음은 몇 가지 제한 사항과 함께 사용 가능합니다.

  • Dask DataFrames

  • XGBoost Dmatrix 클래스

  • TensorFlow 데이터 세트 및 하위 클래스

  • PyTorch 모델

다음 섹션에는 원격 함수에 몇 가지 제한이 있는 이전 Python 클래스를 사용하는 모범 사례, 직렬화된 데이터를 SageMaker 저장하는 위치 및 이에 대한 액세스를 관리하는 방법에 대한 정보가 포함되어 있습니다.

원격 데이터 직렬화 지원이 제한된 Python 클래스 모범 사례

이 섹션에 나열된 Python 클래스는 제한 사항과 함께 사용할 수 있습니다. 다음 섹션에서는 다음과 같은 Python 클래스를 사용하는 방법에 대한 모범 사례를 설명합니다.

  • Dask DataFrames

  • XGBoost DMatric 클래스

  • TensorFlow 데이터 세트 및 하위 클래스

  • PyTorch 모델

Dask는 Python에서 병렬 컴퓨팅에 사용되는 오픈 소스 라이브러리입니다. 이 섹션에서는 다음을 보여줍니다.

  • Dask를 원격 함수 DataFrame 에 전달하는 방법

  • Dask DataFrame의 요약 통계를 Pandas로 변환하는 방법 DataFrame

Dask를 원격 함수 DataFrame 에 전달하는 방법

Dask DataFrames는 사용 가능한 것보다 더 많은 메모리가 필요한 데이터 세트를 보유할 수 있기 때문에 대규모 데이터 세트를 처리하는 데 자주 사용됩니다. 이는 Dask가 로컬 데이터를 메모리에 로드하지 DataFrame 않기 때문입니다. Dask DataFrame를 원격 함수에 함수 인수로 전달하는 경우 Dask는 데이터 자체 대신 로컬 디스크 또는 클라우드 스토리지의 데이터에 대한 참조를 전달할 수 있습니다. 다음 코드는 비어 있는 에서 작동하는 원격 함수 DataFrame 내에서 Dask를 전달하는 예를 보여줍니다 DataFrame.

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

Dask는 를 사용하는 경우에만 Dask의 데이터를 메모리 DataFrame 로 로드합니다 DataFrame . 원격 함수 DataFrame 내에서 Dask를 사용하려면 데이터 에 대한 경로를 제공합니다. 그러면 Dask는 코드 실행 시 지정한 데이터 경로에서 직접 데이터 세트를 읽습니다.

다음 코드 예제는 원격 함수 내에서 Dask DataFrame를 사용하는 방법을 보여줍니다clean. 코드 예제에서 raw_data_path는 Dask 대신 정리로 전달됩니다 DataFrame. 코드가 실행되면 raw_data_path에 지정된 Amazon S3 버킷의 위치에서 데이터 세트를 직접 읽습니다. 그런 다음 persist 함수는 데이터 세트를 메모리에 유지하여 후속 random_split 함수를 용이하게 하고 Dask DataFrame API 함수를 사용하여 S3 버킷의 출력 데이터 경로에 다시 기록합니다.

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
Dask DataFrame의 요약 통계를 Pandas로 변환하는 방법 DataFrame

다음 예제 코드와 같이 compute 메서드를 호출 DataFrame 하여 Dask의 요약 통계를 Pandas로 변환할 DataFrame 수 있습니다. 이 예제에서는 S3 버킷에 메모리 또는 Pandas 데이터프레임에 맞지 DataFrame 않는 대형 Dask가 포함되어 있습니다. 다음 예제에서는 원격 함수가 데이터 세트를 스캔하고 의 출력 통계가 포함된 Dask DataFrame를 Pandas describe에 반환합니다 DataFrame.

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrix 는 에서 데이터를 로드XGBoost하는 데 사용되는 내부 데이터 구조입니다. 컴퓨팅 세션 간에 쉽게 이동하기 위해 DMatrix 객체를 선택할 수 없습니다. DMatrix 인스턴스를 직접 전달하면 에서 실패합니다SerializationError.

원격 함수에 데이터 객체를 전달하고 를 사용하여 학습하는 방법 XGBoost

Pandas를 DMatrix 인스턴스 DataFrame 로 변환하고 원격 함수에서 훈련하는 데 사용하려면 다음 코드 예제와 같이 Pandas를 원격 함수에 직접 전달합니다.

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow 데이터 세트 및 하위 클래스는 훈련 중에 데이터를 로드 TensorFlow 하는 데 사용되는 의 내부 객체입니다. 컴퓨팅 세션 간에 쉽게 이동하기 위해 TensorFlow 데이터 세트 및 하위 클래스를 선택할 수 없습니다. Tensorflow 데이터 세트 또는 하위 클래스를 직접 전달하면 SerializationError로 실패하게 됩니다. 다음 코드 예제와 같이 Tensorflow I/OAPIs를 사용하여 스토리지에서 데이터를 로드합니다.

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch 모델은 직렬화할 수 있으며 로컬 환경과 원격 함수 간에 전달할 수 있습니다. 로컬 환경과 원격 환경의 디바이스 유형이 (GPUs 및 CPUs)와 같이 다른 경우 훈련된 모델을 로컬 환경에 반환할 수 없습니다. 예를 들어 다음 코드가 없이 로컬 환경에서 개발GPUs되었지만 를 사용하여 인스턴스에서 실행되는 경우 훈련된 모델을 직접 GPUs반환하면 로 이어집니다DeserializationError.

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

GPU 환경에서 훈련된 모델을 CPU 기능만 포함된 모델로 반환하려면 아래 코드 예제와 같이 PyTorch 모델 I/O를 APIs 직접 사용합니다.

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

가 직렬화된 데이터를 SageMaker 저장하는 위치

원격 함수를 호출하면 는 입력 및 출력 단계에서 함수 인수를 SageMaker 자동으로 직렬화하고 값을 반환합니다. 직렬화된 이 데이터는 S3 버킷의 루트 디렉터리에 저장됩니다. 구성 파일에 루트 디렉터리 <s3_root_uri>를 지정합니다. 파라미터 job_name이 자동으로 생성됩니다.

루트 디렉터리에서 현재 작업 디렉터리, 직렬화된 함수, 직렬화된 함수에 대한 인수, 결과 및 직렬화된 함수 호출로 인해 발생한 모든 예외를 포함하는 <job_name> 폴더를 SageMaker 생성합니다.

<job_name> 아래 디렉터리 workdir에는 현재 작업 디렉터리의 압축 아카이브가 들어 있습니다. 압축 아카이브에는 작업 디렉터리의 모든 Python 파일과 requirements.txt 파일이 포함되어 있는데 이는 원격 함수를 실행하는 데 필요한 종속성을 지정합니다.

다음은 구성 파일에 지정하는 S3 버킷의 폴더 구조 예제입니다.

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

S3 버킷에 지정하는 루트 디렉터리는 장기 스토리지용이 아닙니다. 직렬화된 데이터는 직렬화 중에 사용된 Python 버전 및 기계 학습(ML) 프레임워크 버전과 밀접하게 연결되어 있습니다. Python 버전 또는 ML 프레임워크를 업그레이드하면 직렬화된 데이터를 사용하지 못할 수 있습니다. 대신 다음을 수행합니다.

  • Python 버전과 ML 프레임워크에 구애받지 않는 형식으로 모델 및 모델 아티팩트를 저장합니다.

  • Python 또는 ML 프레임워크를 업그레이드하는 경우 장기 스토리지에서 모델 결과에 액세스할 수 있습니다.

중요

지정된 시간이 지난 후 직렬화된 데이터를 삭제하려면 S3 버킷에 수명 구성을 설정합니다.

참고

Python 피클 모듈로 직렬화된 파일은 CSV, Parquet 및 를 포함한 다른 데이터 형식보다 휴대성이 떨어질 수 있습니다JSON. 출처를 알 수 없는 피클된 파일을 로드하지 않도록 주의하세요.

원격 함수의 구성 파일에 포함할 내용에 대한 추가 정보는 구성 파일을 참조하세요.

직렬화된 데이터에 대한 액세스

관리자는 직렬화된 데이터 설정을 제공할 수 있으며 여기에는 구성 파일에서의 위치 및 모든 암호화 설정이 포함됩니다. 기본적으로 직렬화된 데이터는 AWS Key Management Service (AWS KMS) 키로 암호화됩니다. 또한 관리자는 버킷 정책을 사용하여 구성 파일에 지정한 루트 디렉터리의 액세스를 제한할 수 있습니다. 구성 파일은 여러 프로젝트 및 작업에서 공유하며 사용할 수 있습니다. 자세한 정보는 구성 파일을 참조하세요.

RemoteExecutor API 를 사용하여 함수 호출

를 사용하여 함수를 호출RemoteExecutorAPI할 수 있습니다. SageMaker PythonSDK은 RemoteExecutor 호출 내의 코드를 훈련 작업으로 SageMaker 변환합니다. 그러면 훈련 작업에서 함수를 비동기식 작업으로 호출하고 퓨처를 반환합니다. 를 사용하는 경우 둘 이상의 훈련 작업을 병렬로 실행할 RemoteExecutor API수 있습니다. Python의 퓨처에 대한 자세한 내용은 Future를 참조하세요.

다음 코드 예제에서는 필요한 라이브러리를 가져오고, 함수를 정의하고, SageMaker 인스턴스를 시작하고, API를 사용하여 병렬로 2 작업을 실행하기 위한 요청을 제출하는 방법을 보여줍니다.

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

RemoteExecutor 클래스는 Concurrent.Futures.Executor 라이브러리를 구현한 것입니다.

다음 코드 예제에서는 RemoteExecutorAPI를 사용하여 함수를 정의하고 호출하는 방법을 보여줍니다. 이 예제에서 RemoteExecutor는 총 4개 작업을 제출하지만 2개만 병렬로 제출합니다. 마지막 두 개 작업은 오버헤드를 최소화하면서 클러스터를 재사용합니다.

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

max_parallel_job 파라미터는 컴퓨팅 리소스 할당을 최적화하지 않고 속도 제한 메커니즘으로만 사용됩니다. 이전 코드 예제에서 RemoteExecutor는 작업이 제출되기 전에 두 개의 병렬 작업을 위한 컴퓨팅 리소스를 예약하지 않습니다. max_parallel_job 또는 @remote 데코레이터의 기타 파라미터에 대한 자세한 내용은 원격 함수 클래스 및 메서드 사양을 참조하세요.

의 미래 클래스 RemoteExecutor API

퓨처 클래스는 반환 함수가 비동기식으로 호출될 때 훈련 작업의 반환 함수를 나타내는 퍼블릭 클래스입니다. 퓨처 클래스는 Concurrent.futures.future 클래스를 구현합니다. 이 클래스는 기본 작업을 수행하고 데이터를 메모리로 로드하는 데 사용할 수 있습니다.