torchserve/inf2/llama2/llama-2-13b.ipynb (736 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "0524d4b4", "metadata": {}, "source": [ "# Using TorchServe on SageMaker Inf2.24xlarge with Llama2-13B" ] }, { "cell_type": "markdown", "id": "92a529f6", "metadata": {}, "source": [ "## Contents\n", "\n", "This notebook uses SageMaker notebook instance conda_python3 kernel, demonstrates how to use TorchServe to deploy Llama-2-13 on SageMaker inf2.24xlarge. There are multiple advanced features in this example.\n", "\n", "* Neuronx AOT precompile model\n", "* TorchServe microbatching\n", "* TorchServe LLM batching streaming respone on SageMaker\n", "* SageMaker uncompressed model artifacts" ] }, { "cell_type": "markdown", "id": "9f19ac44", "metadata": {}, "source": [ "## Step 1: Let's bump up SageMaker and import stuff\n", "\n", "The wheel installed here is a private preview wheel, you need to add into allowlist to run this function" ] }, { "cell_type": "code", "execution_count": null, "id": "ff6901ce", "metadata": {}, "outputs": [], "source": [ "!python --version" ] }, { "cell_type": "code", "execution_count": null, "id": "fe8ab917", "metadata": {}, "outputs": [], "source": [ "# Install the latest aws cli v2 if it is not installed\n", "!curl \"https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip\" -o \"awscliv2.zip\"\n", "!unzip awscliv2.zip\n", "#Follow the instruction to install aws v2 on the terminal\n", "!cat aws/README.md" ] }, { "cell_type": "code", "execution_count": null, "id": "80f63ff2", "metadata": {}, "outputs": [], "source": [ "# Conform it is aws-cli/2.xx.xx\n", "!aws --version" ] }, { "cell_type": "code", "execution_count": null, "id": "a3d68be2", "metadata": { "tags": [] }, "outputs": [], "source": [ "#%pip install sagemaker pip --upgrade --quiet\n", "!pip install numpy\n", "!pip install pillow\n", "!pip install -U sagemaker\n", "!pip install -U boto \n", "!pip install -U botocore\n", "!pip install -U boto3" ] }, { "cell_type": "code", "execution_count": null, "id": "4874a3bb", "metadata": { "tags": [] }, "outputs": [], "source": [ "import boto3\n", "import sagemaker\n", "from sagemaker import Model, image_uris, serializers, deserializers\n", "\n", "boto3_session=boto3.session.Session(region_name=\"us-west-2\")\n", "smr = boto3.client('sagemaker-runtime-demo')\n", "sm = boto3.client('sagemaker')\n", "role = sagemaker.get_execution_role() # execution role for the endpoint\n", "sess= sagemaker.session.Session(boto3_session, sagemaker_client=sm, sagemaker_runtime_client=smr) # sagemaker session for interacting with different AWS APIs\n", "region = sess._region_name # region name of the current SageMaker Studio environment\n", "account = sess.account_id() # account_id of the current SageMaker Studio environment\n", "\n", "# Configuration:\n", "bucket_name = sess.default_bucket()\n", "prefix = \"torchserve\"\n", "output_path = f\"s3://{bucket_name}/{prefix}\"\n", "print(f'account={account}, region={region}, role={role}, output_path={output_path}')" ] }, { "cell_type": "markdown", "id": "306c2c9a", "metadata": {}, "source": [ "## Step 2: Build a BYOD TorchServe Docker container and push it to Amazon ECR" ] }, { "cell_type": "code", "execution_count": null, "id": "10a00114", "metadata": {}, "outputs": [], "source": [ "# Install our own dependencies\n", "!cat workspace/docker/Dockerfile" ] }, { "cell_type": "code", "execution_count": 2, "id": "b3c82d3e", "metadata": {}, "outputs": [], "source": [ "%%capture build_output\n", "\n", "baseimage = \"763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference-neuronx:1.13.1-neuronx-py310-sdk2.13.2-ubuntu20.04\"\n", "reponame = \"neuronx\"\n", "versiontag = \"2-13-2\"\n", "\n", "# Build our own docker image\n", "!cd workspace/docker && ./build_and_push.sh {reponame} {versiontag} {baseimage} {region} {account}" ] }, { "cell_type": "code", "execution_count": null, "id": "02c84373", "metadata": {}, "outputs": [], "source": [ "if 'Error response from daemon' in str(build_output):\n", " print(build_output)\n", " raise SystemExit('\\n\\n!!There was an error with the container build!!')\n", "else:\n", " container = str(build_output).strip().split('\\n')[-1]\n", "\n", "print(container)" ] }, { "cell_type": "markdown", "id": "ecb9a2ad", "metadata": {}, "source": [ "## Step 3: AOT Pre-Compile Model on EC2 \n", "\n", "This step is to precompile model with batchSize and save it in cache dir to use it in later model loading. It can significantly reduce model loading latency to a few seconds. This example already ran this step and saved the model artifacts in **TorchServe `model zoo: s3://torchserve/mar_files/sm-neuronx/llama-2-13b-neuronx-b1/`**. You can **skip this step, directly jump to option 2 of Step 4 if you want to use it**. \n", "\n", "### Precompile the model at local EC2\n", "\n", "Follow [Instruction](https://github.com/pytorch/serve/blob/master/examples/large_models/inferentia2/llama2/Readme.md?plain=1#L56) at local EC2 to precompile the model. The precompiled model will be stored in `model_store/llama-2-13b/neuron_cache`\n", "\n", "#### Main steps on EC2\n", " \n", "* download llama-2-13b from HF \n", "* Save the model split checkpoints compatible with `transformers-neuronx`\n", "* Create EC2 model artifacts at local\n", "* Start TorchServe to load the model\n", "* Precompile model and save it in `model_store/llama-2-13b/neuron_cache`\n", "* Run inference to test" ] }, { "cell_type": "markdown", "id": "716d62e7", "metadata": {}, "source": [ "#### Keynotes\n", "\n", "1. **Turn on neuron_cache** in function initialize of inf2_handler.py\n", "```\n", "os.environ[\"NEURONX_CACHE\"] = \"on\"\n", "os.environ[\"NEURONX_DUMP_TO\"] = f\"{model_dir}/neuron_cache\"\n", "```\n", "\n", "2. Set **micro_batch_size: 1** in model-config.yaml to precompile the model with batch size 1 in this example" ] }, { "cell_type": "code", "execution_count": 3, "id": "0b45e0e6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "minWorkers: 1\r\n", "maxWorkers: 1\r\n", "maxBatchDelay: 100\r\n", "responseTimeout: 10800\r\n", "batchSize: 16\r\n", "\r\n", "handler:\r\n", " model_checkpoint_dir: \"llama-2-13b-split\"\r\n", " amp: \"bf16\"\r\n", " tp_degree: 12 \r\n", " max_length: 100\r\n", "\r\n", "micro_batching:\r\n", " micro_batch_size: 1\r\n", " parallelism:\r\n", " preprocess: 2\r\n", " inference: 1\r\n", " postprocess: 2\r\n" ] } ], "source": [ "!cat workspace/model-config.yaml" ] }, { "cell_type": "markdown", "id": "7e303d88", "metadata": {}, "source": [ "3. The generated cache files are **tightly coupled with neuronx SDK version**\n", "```\n", "This example is based on Neuron DLC 2.13.2: 763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference-neuronx:1.13.1-neuronx-py310-sdk2.13.2-ubuntu20.04\n", "```" ] }, { "cell_type": "markdown", "id": "fd48ab5d", "metadata": {}, "source": [ "## Step 4: Upload model artifacts to S3" ] }, { "cell_type": "markdown", "id": "e115df40", "metadata": {}, "source": [ "You can choose either option 1 or option 2.\n", "\n", "* option 1: Sample command to copy llama-2-13b from EC2 to S3\n", "```\n", "print(output_path)\n", "!aws s3 cp llama-2-13b {output_path}/llama-2-13b --recursive\n", "s3_uri = f\"{output_path}/llama-2-13b-neuronx-b4/\"\n", "```\n", "\n", "* option 2: The model artifacts of this example is available in **TorchServe Model Zoo `s3://torchserve/mar_files/sm-neuronx/llama-2-13b-neuronx-b1/`** which supports llama2-13b on neuronx batchSize = 1 based on torch-neuronx==1.13.1.1.10.1. You can copy it to your SM S3 model repo. For example" ] }, { "cell_type": "code", "execution_count": null, "id": "7cafa5cf", "metadata": {}, "outputs": [], "source": [ "!aws s3 cp s3://torchserve/mar_files/sm-neuronx/llama-2-13b-neuronx-b1/ llama-2-13b-neuronx-b1 --recursive" ] }, { "cell_type": "code", "execution_count": null, "id": "57d4e190", "metadata": {}, "outputs": [], "source": [ "!aws s3 cp llama-2-13b-neuronx-b1 {output_path}/llama-2-13b-neuronx-b1 --recursive" ] }, { "cell_type": "code", "execution_count": null, "id": "aa339407", "metadata": {}, "outputs": [], "source": [ "s3_uri = f\"{output_path}/llama-2-13b-neuronx-b1/\"\n", "print(s3_uri)" ] }, { "cell_type": "markdown", "id": "329c259c", "metadata": {}, "source": [ "## Step 5: Start building SageMaker endpoint\n", "In this step, we will build SageMaker endpoint from scratch" ] }, { "cell_type": "markdown", "id": "0771053b", "metadata": {}, "source": [ "### Create SageMaker endpoint\n", "\n", "You need to specify the instance to use and endpoint names" ] }, { "cell_type": "code", "execution_count": 20, "id": "8bcfdfb8", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "<sagemaker.model.Model object at 0x7fda64bf7040>\n" ] } ], "source": [ "from datetime import datetime\n", "\n", "instance_type = \"ml.inf2.24xlarge\"\n", "endpoint_name = sagemaker.utils.name_from_base(\"ts-inf2-llama2-13b-b1\")\n", "\n", "model = Model(\n", " name=\"torchserve-inf2-llama2-13b\" + datetime.now().strftime(\"%Y-%m-%d-%H-%M-%S\"),\n", " # Enable SageMaker uncompressed model artifacts\n", " model_data={\n", " \"S3DataSource\": {\n", " \"S3Uri\": s3_uri,\n", " \"S3DataType\": \"S3Prefix\",\n", " \"CompressionType\": \"None\",\n", " }\n", " },\n", " image_uri=container,\n", " role=role,\n", " sagemaker_session=sess,\n", " env={\"TS_INSTALL_PY_DEP_PER_MODEL\": \"true\"},\n", ")\n", "print(model)" ] }, { "cell_type": "code", "execution_count": 21, "id": "53eaa416", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Your model is not compiled. Please compile your model before using Inferentia.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "---------------!" ] } ], "source": [ "model.deploy(\n", " initial_instance_count=1,\n", " instance_type=instance_type,\n", " endpoint_name=endpoint_name,\n", " volume_size=512, # increase the size to store large model\n", " model_data_download_timeout=3600, # increase the timeout to download large model\n", " container_startup_health_check_timeout=600, # increase the timeout to load large model\n", ")" ] }, { "cell_type": "markdown", "id": "39513f65", "metadata": {}, "source": [ "## Run the inference with streaming response" ] }, { "cell_type": "markdown", "id": "602383fc", "metadata": {}, "source": [ "### TorchServe streaming response sample code\n", "\n", "TorchServe [TextIteratorStreamerBatch](https://github.com/pytorch/serve/blob/d0ae857abfe6d36813c88e531316149a5a354a93/ts/handler_utils/hf_batch_streamer.py#L7C7-L7C32) extends HF BaseStream to support a batch of inference requests. It streaming response via [send_intermediate_predict_response](https://github.com/pytorch/serve/blob/d0ae857abfe6d36813c88e531316149a5a354a93/ts/protocol/otf_message_handler.py#L355). " ] }, { "cell_type": "code", "execution_count": 5, "id": "4f92eb42", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "import logging\r\n", "import os\r\n", "from abc import ABC\r\n", "from threading import Thread\r\n", "\r\n", "import torch_neuronx\r\n", "from transformers import AutoConfig, LlamaTokenizer\r\n", "from transformers_neuronx.generation_utils import HuggingFaceGenerationModelAdapter\r\n", "from transformers_neuronx.llama.model import LlamaForSampling\r\n", "\r\n", "from ts.handler_utils.hf_batch_streamer import TextIteratorStreamerBatch\r\n", "from ts.handler_utils.micro_batching import MicroBatching\r\n", "from ts.protocol.otf_message_handler import send_intermediate_predict_response\r\n", "from ts.torch_handler.base_handler import BaseHandler\r\n", "\r\n", "logger = logging.getLogger(__name__)\r\n", "\r\n", "\r\n", "class LLMHandler(BaseHandler, ABC):\r\n", " \"\"\"\r\n", " Transformers handler class for text completion streaming on Inferentia2\r\n", " \"\"\"\r\n", "\r\n", " def __init__(self):\r\n", " super().__init__()\r\n", " self.initialized = False\r\n", " self.max_length = None\r\n", " self.tokenizer = None\r\n", " self.output_streamer = None\r\n", " # enable micro batching\r\n", " self.handle = MicroBatching(self)\r\n", "\r\n", " def initialize(self, ctx):\r\n", " self.manifest = ctx.manifest\r\n", " properties = ctx.system_properties\r\n", " model_dir = properties.get(\"model_dir\")\r\n", " model_checkpoint_dir = ctx.model_yaml_config.get(\"handler\", {}).get(\r\n", " \"model_checkpoint_dir\", \"\"\r\n", " )\r\n", " model_checkpoint_path = f\"{model_dir}/{model_checkpoint_dir}\"\r\n", " os.environ[\"NEURONX_CACHE\"] = \"on\"\r\n", " os.environ[\"NEURONX_DUMP_TO\"] = f\"{model_dir}/neuron_cache\"\r\n", " os.environ[\"NEURON_CC_FLAGS\"] = \"--model-type=transformer-inference\"\r\n", "\r\n", " # micro batching initialization\r\n", " micro_batching_parallelism = ctx.model_yaml_config.get(\r\n", " \"micro_batching\", {}\r\n", " ).get(\"parallelism\", None)\r\n", " if micro_batching_parallelism:\r\n", " logger.info(\r\n", " f\"Setting micro batching parallelism from model_config_yaml: {micro_batching_parallelism}\"\r\n", " )\r\n", " self.handle.parallelism = micro_batching_parallelism\r\n", "\r\n", " micro_batch_size = ctx.model_yaml_config.get(\"micro_batching\", {}).get(\r\n", " \"micro_batch_size\", 1\r\n", " )\r\n", " logger.info(f\"Setting micro batching size: {micro_batch_size}\")\r\n", " self.handle.micro_batch_size = micro_batch_size\r\n", "\r\n", " # settings for model compiliation and loading\r\n", " amp = ctx.model_yaml_config.get(\"handler\", {}).get(\"amp\", \"f32\")\r\n", " tp_degree = ctx.model_yaml_config.get(\"handler\", {}).get(\"tp_degree\", 6)\r\n", " self.max_length = ctx.model_yaml_config.get(\"handler\", {}).get(\"max_length\", 50)\r\n", "\r\n", " # allocate \"tp_degree\" number of neuron cores to the worker process\r\n", " os.environ[\"NEURON_RT_NUM_CORES\"] = str(tp_degree)\r\n", " try:\r\n", " num_neuron_cores_available = (\r\n", " torch_neuronx.xla_impl.data_parallel.device_count()\r\n", " )\r\n", " assert num_neuron_cores_available >= int(tp_degree)\r\n", " except (RuntimeError, AssertionError) as error:\r\n", " logger.error(\r\n", " \"Required number of neuron cores for tp_degree \"\r\n", " + str(tp_degree)\r\n", " + \" are not available: \"\r\n", " + str(error)\r\n", " )\r\n", "\r\n", " raise error\r\n", "\r\n", " self.tokenizer = LlamaTokenizer.from_pretrained(model_checkpoint_path)\r\n", " self.tokenizer.pad_token = self.tokenizer.eos_token\r\n", " self.model = LlamaForSampling.from_pretrained(\r\n", " model_checkpoint_path,\r\n", " batch_size=self.handle.micro_batch_size,\r\n", " amp=amp,\r\n", " tp_degree=tp_degree,\r\n", " )\r\n", " logger.info(\"Starting to compile the model\")\r\n", " self.model.to_neuron()\r\n", " logger.info(\"Model has been successfully compiled\")\r\n", " model_config = AutoConfig.from_pretrained(model_checkpoint_path)\r\n", " self.model = HuggingFaceGenerationModelAdapter(model_config, self.model)\r\n", " self.output_streamer = TextIteratorStreamerBatch(\r\n", " self.tokenizer,\r\n", " batch_size=self.handle.micro_batch_size,\r\n", " skip_special_tokens=True,\r\n", " )\r\n", "\r\n", " self.initialized = True\r\n", "\r\n", " def preprocess(self, requests):\r\n", " input_text = []\r\n", " for req in requests:\r\n", " data = req.get(\"data\") or req.get(\"body\")\r\n", " if isinstance(data, (bytes, bytearray)):\r\n", " data = data.decode(\"utf-8\")\r\n", " logger.info(f\"received req={data}\")\r\n", " input_text.append(data.strip())\r\n", "\r\n", " # Ensure the compiled model can handle the input received\r\n", " if len(input_text) > self.handle.micro_batch_size:\r\n", " raise ValueError(\r\n", " f\"Model is compiled for batch size {self.handle.micro_batch_size} but received input of size {len(input_text)}\"\r\n", " )\r\n", "\r\n", " # Pad input to match compiled model batch size\r\n", " input_text.extend([\"\"] * (self.handle.micro_batch_size - len(input_text)))\r\n", "\r\n", " return self.tokenizer(input_text, return_tensors=\"pt\", padding=True)\r\n", "\r\n", " def inference(self, tokenized_input):\r\n", " generation_kwargs = dict(\r\n", " tokenized_input,\r\n", " streamer=self.output_streamer,\r\n", " max_new_tokens=self.max_length,\r\n", " )\r\n", " self.model.reset_generation()\r\n", " thread = Thread(target=self.model.generate, kwargs=generation_kwargs)\r\n", " thread.start()\r\n", "\r\n", " micro_batch_idx = self.handle.get_micro_batch_idx()\r\n", " micro_batch_req_id_map = self.get_micro_batch_req_id_map(micro_batch_idx)\r\n", " for new_text in self.output_streamer:\r\n", " logger.debug(\"send response stream\")\r\n", " send_intermediate_predict_response(\r\n", " new_text[: len(micro_batch_req_id_map)],\r\n", " micro_batch_req_id_map,\r\n", " \"Intermediate Prediction success\",\r\n", " 200,\r\n", " self.context,\r\n", " )\r\n", "\r\n", " thread.join()\r\n", "\r\n", " return [\"\"] * len(micro_batch_req_id_map)\r\n", "\r\n", " def postprocess(self, inference_output):\r\n", " return inference_output\r\n", "\r\n", " def get_micro_batch_req_id_map(self, micro_batch_idx: int):\r\n", " start_idx = micro_batch_idx * self.handle.micro_batch_size\r\n", " micro_batch_req_id_map = {\r\n", " index: self.context.request_ids[batch_index]\r\n", " for index, batch_index in enumerate(\r\n", " range(start_idx, start_idx + self.handle.micro_batch_size)\r\n", " )\r\n", " if batch_index in self.context.request_ids\r\n", " }\r\n", "\r\n", " return micro_batch_req_id_map\r\n" ] } ], "source": [ "!cat workspace/inf2_handler.py" ] }, { "cell_type": "markdown", "id": "c89453f2", "metadata": {}, "source": [ "### SageMaker streaming response" ] }, { "cell_type": "code", "execution_count": 22, "id": "360839a4", "metadata": { "tags": [] }, "outputs": [], "source": [ "import io\n", "\n", "class Parser:\n", " \"\"\"\n", " A helper class for parsing the byte stream input. \n", " \n", " The output of the model will be in the following format:\n", " ```\n", " b'{\"outputs\": [\" a\"]}\\n'\n", " b'{\"outputs\": [\" challenging\"]}\\n'\n", " b'{\"outputs\": [\" problem\"]}\\n'\n", " ...\n", " ```\n", " \n", " While usually each PayloadPart event from the event stream will contain a byte array \n", " with a full json, this is not guaranteed and some of the json objects may be split across\n", " PayloadPart events. For example:\n", " ```\n", " {'PayloadPart': {'Bytes': b'{\"outputs\": '}}\n", " {'PayloadPart': {'Bytes': b'[\" problem\"]}\\n'}}\n", " ```\n", " \n", " This class accounts for this by concatenating bytes written via the 'write' function\n", " and then exposing a method which will return lines (ending with a '\\n' character) within\n", " the buffer via the 'scan_lines' function. It maintains the position of the last read \n", " position to ensure that previous bytes are not exposed again. \n", " \"\"\"\n", " \n", " def __init__(self):\n", " self.buff = io.BytesIO()\n", " self.read_pos = 0\n", " \n", " def write(self, content):\n", " self.buff.seek(0, io.SEEK_END)\n", " self.buff.write(content)\n", " data = self.buff.getvalue()\n", " \n", " def scan_lines(self):\n", " self.buff.seek(self.read_pos)\n", " for line in self.buff.readlines():\n", " if line[-1] != b'\\n':\n", " self.read_pos += len(line)\n", " yield line[:-1]\n", " \n", " def reset(self):\n", " self.read_pos = 0" ] }, { "cell_type": "code", "execution_count": 23, "id": "7f760056", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Today the weather is really nice and I am planning on going to the beach. I am going to take my camera and take some pictures of the beach. I am going to take pictures of the sand, the water, and the people. I am also going to take pictures of the sunset. I am really excited to go to the beach and take pictures. The beach is a great place to take pictures. The sand, the water, and the people are all great subjects for pictures. The sunset is also a great subject for pictures " ] } ], "source": [ "import json\n", "\n", "body = \"Today the weather is really nice and I am planning on\".encode('utf-8')\n", "resp = smr.invoke_endpoint_with_response_stream(EndpointName=endpoint_name, Body=body, ContentType=\"application/json\")\n", "event_stream = resp['Body']\n", "parser = Parser()\n", "for event in event_stream:\n", " parser.write(event['PayloadPart']['Bytes'])\n", " for line in parser.scan_lines():\n", " print(line.decode(\"utf-8\"), end=' ')" ] }, { "cell_type": "markdown", "id": "46b2e7b4", "metadata": {}, "source": [ "## Clean up the environment" ] }, { "cell_type": "code", "execution_count": null, "id": "f23bcaf4", "metadata": { "tags": [] }, "outputs": [], "source": [ "sess.delete_endpoint(endpoint_name)\n", "sess.delete_endpoint_config(endpoint_name)\n", "model.delete_model()" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" } }, "nbformat": 4, "nbformat_minor": 5 }