Deploy large models for inference with TorchServe
This tutorial demonstrates how to deploy large models and serve inference in Amazon SageMaker AI with
TorchServe on GPUs. This example deploys the OPT-30bml.g5
instance. You can modify this to work with other models and instance
types. Replace the
in
the examples with your own information.italicized placeholder text
TorchServe is a powerful open platform for large distributed model inference. By
supporting popular libraries like PyTorch, native PiPPy, DeepSpeed, and HuggingFace
Accelerate, it offers uniform handler APIs that remain consistent across distributed large
model and non-distributed model inference scenarios. For more information, see TorchServe’s large model
inference documentation
Deep learning containers with TorchServe
To deploy a large model with TorchServe on SageMaker AI, you can use one of the SageMaker AI deep learning containers (DLCs). By default, TorchServe is installed in all AWS PyTorch DLCs. During model loading, TorchServe can install specialized libraries tailored for large models such as PiPPy, Deepspeed, and Accelerate.
The following table lists all of the SageMaker AI DLCs with TorchServe
DLC cateogry | Framework | Hardware | Example URL |
---|---|---|---|
PyTorch 2.0.0+ |
CPU, GPU |
763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.1-gpu-py310-cu118-ubuntu20.04-sagemaker |
|
PyTorch 2.0.0+ |
CPU |
763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference-graviton:2.0.1-cpu-py310-ubuntu20.04-sagemaker |
|
PyTorch 2.0.0+ |
GPU |
763104351884.dkr.ecr.us-east-1.amazonaws.com/stabilityai-pytorch-inference:2.0.1-sgm0.1.0-gpu-py310-cu118-ubuntu20.04-sagemaker |
|
PyTorch 1.13.1 |
Neuronx |
763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference-neuron:1.13.1-neuron-py310-sdk2.12.0-ubuntu20.04 |
Getting started
Before deploying your model, complete the prerequisites. You can also configure your model parameters and customize the handler code.
Prerequisites
To get started, ensure that you have the following prerequisites:
-
Ensure you have access to an AWS account. Set up your environment so that the AWS CLI can access your account through either an AWS IAM user or an IAM role. We recommend using an IAM role. For the purposes of testing in your personal account, you can attach the following managed permissions policies to the IAM role:
For more information about attaching IAM policies to a role, see Adding and removing IAM identity permissions in the AWS IAM User Guide.
-
Locally configure your dependencies, as shown in the following examples.
-
Install version 2 of the AWS CLI:
# Install the latest AWS CLI v2 if it is not installed !curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" !unzip awscliv2.zip #Follow the instructions to install v2 on the terminal !cat aws/README.md
-
Install SageMaker AI and the Boto3 client:
# If already installed, update your client #%pip install sagemaker pip --upgrade --quiet !pip install -U sagemaker !pip install -U boto !pip install -U botocore !pip install -U boto3
-
Configure model settings and parameters
TorchServe uses torchrun
model_config.yaml
file. The environment variable
CUDA_VISIBLE_DEVICES
, which specifies the GPU device IDs that are
visible at a given time, is set based this number.
For example, suppose there are 8 GPUs on a node and one worker needs 4 GPUs
on a node (nproc_per_node=4
). In this case, TorchServe assigns four
GPUs to the first worker (CUDA_VISIBLE_DEVICES="0,1,2,3"
) and four GPUs
to the second worker (CUDA_VISIBLE_DEVICES="4,5,6,7”
).
In addition to this default behavior, TorchServe provides the flexibility for
users to specify GPUs for a worker. For instance, if you set the variable
deviceIds: [2,3,4,5]
in the model config YAML filenproc_per_node=2
, then
TorchServe assigns CUDA_VISIBLE_DEVICES=”2,3”
to the first worker and
CUDA_VISIBLE_DEVICES="4,5”
to the second worker.
In the following model_config.yaml
example, we configure both
front-end and back-end parameters for the OPT-30b parallelType
,
deviceType
, deviceIds
and torchrun
. For
more detailed information about the front-end parameters you can configure, see the
PyTorch GitHub documentation
# TorchServe front-end parameters minWorkers: 1 maxWorkers: 1 maxBatchDelay: 100 responseTimeout: 1200 parallelType: "tp" deviceType: "gpu" # example of user specified GPU deviceIds deviceIds: [0,1,2,3] # sets CUDA_VISIBLE_DEVICES torchrun: nproc-per-node: 4 # TorchServe back-end parameters deepspeed: config: ds-config.json checkpoint: checkpoints.json handler: # parameters for custom handler code model_name: "facebook/opt-30b" model_path: "model/models--facebook--opt-30b/snapshots/ceea0a90ac0f6fae7c2c34bcb40477438c152546" max_length: 50 max_new_tokens: 10 manual_seed: 40
Customize handlers
TorchServe offers
base handlerscustom_handler.py
code on the PyTorch GitHub
documentation
class TransformersSeqClassifierHandler(BaseDeepSpeedHandler, ABC): """ Transformers handler class for sequence, token classification and question answering. """ def __init__(self): super(TransformersSeqClassifierHandler, self).__init__() self.max_length = None self.max_new_tokens = None self.tokenizer = None self.initialized = False def initialize(self, ctx: Context): """In this initialize function, the HF large model is loaded and partitioned using DeepSpeed. Args: ctx (context): It is a JSON Object containing information pertaining to the model artifacts parameters. """ super().initialize(ctx) model_dir = ctx.system_properties.get("model_dir") self.max_length = int(ctx.model_yaml_config["handler"]["max_length"]) self.max_new_tokens = int(ctx.model_yaml_config["handler"]["max_new_tokens"]) model_name = ctx.model_yaml_config["handler"]["model_name"] model_path = ctx.model_yaml_config["handler"]["model_path"] seed = int(ctx.model_yaml_config["handler"]["manual_seed"]) torch.manual_seed(seed) logger.info("Model %s loading tokenizer", ctx.model_name) self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.tokenizer.pad_token = self.tokenizer.eos_token config = AutoConfig.from_pretrained(model_name) with torch.device("meta"): self.model = AutoModelForCausalLM.from_config( config, torch_dtype=torch.float16 ) self.model = self.model.eval() ds_engine = get_ds_engine(self.model, ctx) self.model = ds_engine.module logger.info("Model %s loaded successfully", ctx.model_name) self.initialized = True def preprocess(self, requests): """ Basic text preprocessing, based on the user's choice of application mode. Args: requests (list): A list of dictionaries with a "data" or "body" field, each containing the input text to be processed. Returns: tuple: A tuple with two tensors: the batch of input ids and the batch of attention masks. """ def inference(self, input_batch): """ Predicts the class (or classes) of the received text using the serialized transformers checkpoint. Args: input_batch (tuple): A tuple with two tensors: the batch of input ids and the batch of attention masks, as returned by the preprocess function. Returns: list: A list of strings with the predicted values for each input text in the batch. """ def postprocess(self, inference_output): """Post Process Function converts the predicted response into Torchserve readable format. Args: inference_output (list): It contains the predicted response of the input text. Returns: (list): Returns a list of the Predictions and Explanations. """
Prepare your model artifacts
Before deploying your model on SageMaker AI, you must package your model artifacts. For large
models, we recommend that you use the PyTorch torch-model-archiver--archive-format
no-archive
, which skips compressing model artifacts. The following example
saves all of the model artifacts to a new folder named opt/
.
torch-model-archiver --model-name opt --version 1.0 --handler custom_handler.py --extra-files ds-config.json -r requirements.txt --config-file opt/model-config.yaml --archive-format no-archive
Once the opt/
folder is created, download the OPT-30b model to the folder
using the PyTorch Download_model
cd opt python path_to/Download_model.py --model_path model --model_name facebook/opt-30b --revision main
Lastly, upload the model artifacts to an Amazon S3 bucket.
aws s3 cp opt
{your_s3_bucket}
/opt --recursive
You should now have model artifacts stored in Amazon S3 that are ready to deploy to a SageMaker AI endpoint.
Deploy the model using the SageMaker Python SDK
After preparing your model artifacts, you can deploy your model to a SageMaker AI Hosting endpoint. This section describes how to deploy a single large model to an endpoint and make streaming response predictions. For more information about streaming responses from endpoints, see Invoke real-time endpoints.
To deploy your model, complete the following steps:
-
Create a SageMaker AI session, as shown in the following example.
import boto3 import sagemaker from sagemaker import Model, image_uris, serializers, deserializers boto3_session=boto3.session.Session(region_name="us-west-2") smr = boto3.client('sagemaker-runtime-demo') sm = boto3.client('sagemaker') role = sagemaker.get_execution_role() # execution role for the endpoint sess= sagemaker.session.Session(boto3_session, sagemaker_client=sm, sagemaker_runtime_client=smr) # SageMaker AI session for interacting with different AWS APIs region = sess._region_name # region name of the current SageMaker Studio Classic environment account = sess.account_id() # account_id of the current SageMaker Studio Classic environment # Configuration: bucket_name = sess.default_bucket() prefix = "torchserve" output_path = f"s3://{bucket_name}/{prefix}" print(f'account={account}, region={region}, role={role}, output_path={output_path}')
-
Create an uncompressed model in SageMaker AI, as shown in the following example.
from datetime import datetime instance_type = "ml.g5.24xlarge" endpoint_name = sagemaker.utils.name_from_base("ts-opt-30b") s3_uri = {your_s3_bucket}/opt model = Model( name="torchserve-opt-30b" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), # Enable SageMaker uncompressed model artifacts model_data={ "S3DataSource": { "S3Uri": s3_uri, "S3DataType": "S3Prefix", "CompressionType": "None", } }, image_uri=container, role=role, sagemaker_session=sess, env={"TS_INSTALL_PY_DEP_PER_MODEL": "true"}, ) print(model)
-
Deploy the model to an Amazon EC2 instance, as shown in the following example.
model.deploy( initial_instance_count=1, instance_type=instance_type, endpoint_name=endpoint_name, volume_size=512, # increase the size to store large model model_data_download_timeout=3600, # increase the timeout to download large model container_startup_health_check_timeout=600, # increase the timeout to load large model )
-
Initialize a class to process the streaming response, as shown in the following example.
import io class Parser: """ A helper class for parsing the byte stream input. The output of the model will be in the following format: ``` b'{"outputs": [" a"]}\n' b'{"outputs": [" challenging"]}\n' b'{"outputs": [" problem"]}\n' ... ``` While usually each PayloadPart event from the event stream will contain a byte array with a full json, this is not guaranteed and some of the json objects may be split across PayloadPart events. For example: ``` {'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}} ``` This class accounts for this by concatenating bytes written via the 'write' function and then exposing a method which will return lines (ending with a '\n' character) within the buffer via the 'scan_lines' function. It maintains the position of the last read position to ensure that previous bytes are not exposed again. """ def __init__(self): self.buff = io.BytesIO() self.read_pos = 0 def write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) data = self.buff.getvalue() def scan_lines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): if line[-1] != b'\n': self.read_pos += len(line) yield line[:-1] def reset(self): self.read_pos = 0
-
Test a streaming response prediction, as shown in the following example.
import json body = "Today the weather is really nice and I am planning on".encode('utf-8') resp = smr.invoke_endpoint_with_response_stream(EndpointName=endpoint_name, Body=body, ContentType="application/json") event_stream = resp['Body'] parser = Parser() for event in event_stream: parser.write(event['PayloadPart']['Bytes']) for line in parser.scan_lines(): print(line.decode("utf-8"), end=' ')
You have now deployed your model to a SageMaker AI endpoint and should be able to invoke it for responses. For more information about SageMaker AI real-time endpoints, see Single-model endpoints.