benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py (856 lines of code) (raw):

r"""Benchmark LLM serving throughput and latency. This script is for sending requests with prompts to LLM server and benchmark the latency and throughput at various request rates. It is a modified version of https://github.com/vllm-project/vllm/blob/main/benchmarks/benchmark_serving.py. It currently supports TGI, vLLM, Triton TensorRT-LLM and Saxml. """ import argparse import asyncio from datetime import datetime import json import random import requests import time from typing import AsyncGenerator, List, Optional, Tuple, Dict from prometheus_client import start_http_server, Histogram, Gauge import google.auth import google.auth.transport.requests from google.cloud import storage import aiohttp import numpy as np from transformers import AutoTokenizer from transformers import PreTrainedTokenizerBase from google.protobuf.timestamp_pb2 import Timestamp MIN_SEQ_LEN = 4 NEW_TEXT_KEY = "\nOutput:\n" PROMETHEUS_PORT = 9090 # Prometheus Metrics prompt_length_metric = Histogram("LatencyProfileGenerator:prompt_length", "Input prompt length", buckets=[2**i for i in range(1, 16)]) response_length_metric = Histogram("LatencyProfileGenerator:response_length", "Response length", buckets=[2**i for i in range(1, 16)]) request_latency_per_output_token_metric = Histogram('LatencyProfileGenerator:request_latency_per_output_token', 'Time per output token per request (including first token)') tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request (excluding first token)') ttft_metric = Histogram('LatencyProfileGenerator:time_to_first_token', 'Time to first token per request') active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed') # Add trace config for monitoring in flight requests async def on_request_start(session, trace_config_ctx, params): active_requests_metric.inc() async def on_request_end(session, trace_config_ctx, params): active_requests_metric.dec() trace_config = aiohttp.TraceConfig() trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) # Google Cloud Storage Client gcs_client = None gcs_bucket = None def get_filtered_dataset( dataset_path: str, max_input_len: int, max_output_len: int, tokenizer: PreTrainedTokenizerBase, use_dummy_text: bool, ) -> List[Tuple[str, int, int]]: """Samples requests from the dataset or creates dummy requests.""" if use_dummy_text: dummy_prompt_token_ids = [0] * max_input_len dummy_prompt = tokenizer.decode(dummy_prompt_token_ids) return [( dummy_prompt, max_input_len, max_output_len, )] # Load the dataset. with open(dataset_path) as f: dataset = json.load(f) # Filter out the conversations with less than 2 turns. dataset = [data for data in dataset if len(data["conversations"]) >= 2] # Only keep the first two turns of each conversation. dataset = [ (data["conversations"][0]["value"], data["conversations"][1]["value"]) for data in dataset ] # Tokenize the prompts and completions. prompts = [prompt for prompt, _ in dataset] prompt_token_ids = tokenizer(prompts).input_ids completions = [completion for _, completion in dataset] completion_token_ids = tokenizer(completions).input_ids tokenized_dataset = [] for i in range(len(dataset)): output_len = len(completion_token_ids[i]) tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len)) # Filter out too long sequences. filtered_dataset: List[Tuple[str, int, int]] = [] for prompt, prompt_token_ids, output_len in tokenized_dataset: prompt_len = len(prompt_token_ids) if prompt_len < MIN_SEQ_LEN or output_len < MIN_SEQ_LEN: # Prune too short sequences. # This is because TGI causes errors when the input or output length # is too short. continue if prompt_len > max_input_len or output_len > max_output_len: # Prune too long sequences. continue filtered_dataset.append((prompt, prompt_len, output_len)) return filtered_dataset async def generate_next_request( input_requests: List[Tuple[str, int, int]], request_rate: float, ) -> AsyncGenerator[Tuple[str, int, int], None]: """Gets request async.""" while True: request = random.choice(input_requests) yield request if request_rate == float("inf"): # If the request rate is infinity, then we don't need to wait. continue # Sample the request interval from the exponential distribution. interval = np.random.exponential(1.0 / request_rate) # The next request will be sent after the interval. await asyncio.sleep(interval) def init_errors_map() -> Dict[str, int]: errors = { "ClientConnectorError": 0, "TimeoutError": 0, "ContentTypeError": 0, "ClientOSError": 0, "ServerDisconnectedError": 0, "unknown_error": 0, } return errors async def send_stream_request( backend: str, api_url: str, prompt: str, prompt_len: int, output_len: int, best_of: int, use_beam_search: bool, top_k: int, tokenizer: PreTrainedTokenizerBase, sax_model: str, model: str, timeout: float, ) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]: """Sends stream request to server""" request_start_time = time.time() errors = init_errors_map() headers = {"User-Agent": "Benchmark Client"} if backend == "vllm": pload = { "model": model, "prompt": prompt, "n": 1, "best_of": best_of, "use_beam_search": use_beam_search, "temperature": 0.0 if use_beam_search else 1.0, "top_p": 1.0, "max_tokens": output_len, "ignore_eos": True, "stream": True, } elif backend == "jetstream": pload = { "prompt": prompt, "max_tokens": output_len, "stream": True, } else: raise ValueError(f"Unknown backend: {backend}") ttft = 0.0 st = time.perf_counter() output = "" timeout = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session: try: async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: async for chunk_bytes in response.content.iter_chunks(): chunk_bytes = chunk_bytes[0].strip() if not chunk_bytes: continue timestamp = time.perf_counter() # First token if ttft == 0.0: ttft = timestamp - st if backend == "vllm": if chunk_bytes.decode("utf-8")[6:] != "[DONE]": output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] elif backend == "jetstream": if chunk_bytes.decode("utf-8") != "": output += json.loads(chunk_bytes.decode("utf-8"))["text"] except aiohttp.client_exceptions.ClientConnectorError as client_err: errors["ClientConnectorError"] += 1 print(f"ClientConnectorError: {client_err}") return None, None, errors except asyncio.TimeoutError as timeout_err: errors["TimeoutError"] += 1 print(f"TimeoutError: {timeout_err}") return None, None, errors except aiohttp.client_exceptions.ClientOSError as e: errors["ClientOSError"] += 1 print(f"ClientOSError: {e}") return None, None, errors except aiohttp.client_exceptions.ContentTypeError as e: print(f"ContentTypeError: {e}, response: {response}") errors["ContentTypeError"] += 1 return None, None, errors except aiohttp.client_exceptions.ServerDisconnectedError as e: errors["ServerDisconnectedError"] += 1 print(f"ServerDisconnectedError: {e}") return None, None, errors except Exception as e: print(f"Unknown error {e}") errors["unknown_error"] += 1 return None, None, errors request_end_time = time.time() output_token_ids = tokenizer(output).input_ids output_len = len(output_token_ids) request_latency = (prompt_len, output_len, (request_end_time - request_start_time)) # Exclude first token for tpot calculation if output_len > 1: tpot_metric.observe((request_end_time - ttft - request_start_time) / (output_len - 1)) request_latency_per_output_token_metric.observe((request_end_time - request_start_time) / output_len) if ttft is not None: ttft_metric.observe(ttft) prompt_length_metric.observe(prompt_len) response_length_metric.observe(output_len) return request_latency, ttft, None async def send_request( backend: str, api_url: str, prompt: str, prompt_len: int, output_len: int, best_of: int, use_beam_search: bool, top_k: int, tokenizer: PreTrainedTokenizerBase, sax_model: str, model: str, timeout: float, ) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]: """Sends request to server.""" request_start_time = time.time() errors = init_errors_map() headers = {"User-Agent": "Benchmark Client"} if backend == "vllm": pload = { "model": model, "prompt": prompt, "n": 1, "best_of": best_of, "use_beam_search": use_beam_search, "temperature": 0.0 if use_beam_search else 1.0, "top_p": 1.0, "max_tokens": output_len, "ignore_eos": False, "stream": False, } elif backend == "tgi": assert not use_beam_search params = { "best_of": best_of, "max_new_tokens": output_len, "do_sample": True, } pload = { "inputs": prompt, "parameters": params, } elif backend == "naive_transformers": # If max_length or top_k is not specified _MAX_LENGTH_DEFAULT = 200 and # _TOP_K_DEFAULT = 10 in peft/handler.py will be used. pload = { "instances": [{ "prompt": prompt, "max_length": output_len, "top_k": top_k, }] } elif backend == "tensorrt_llm_triton": pload = { "text_input": prompt, "max_tokens": output_len, "beam_width": 1 if not use_beam_search else best_of, "temperature": 0.0 if use_beam_search else 1.0, "top_p": 1.0, "bad_words": "", "stop_words": "", "stream": False, } elif backend == "sax": pload = { "model": sax_model, "prompt": prompt, "n": 1, "best_of": best_of, "use_beam_search": use_beam_search, "temperature": 0.0 if use_beam_search else 1.0, "top_p": 1.0, "top_k": 50, "max_tokens": output_len, "stream": False, } elif backend == "jetstream": pload = { "prompt": prompt, "max_tokens": output_len, } else: raise ValueError(f"Unknown backend: {backend}") # Set client timeout to be 3 hrs. timeout = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(timeout=timeout,trust_env=True,trace_configs=[trace_config]) as session: while True: try: async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: output = await response.json() # Re-send the request if it failed. if "error" not in output: break except aiohttp.client_exceptions.ClientConnectorError as client_err: errors["ClientConnectorError"] += 1 print(f"ClientConnectorError: {client_err}") return None, None, errors except asyncio.TimeoutError as timeout_err: errors["TimeoutError"] += 1 print(f"TimeoutError: {timeout_err}") return None, None, errors except aiohttp.client_exceptions.ClientOSError as e: errors["ClientOSError"] += 1 print(f"ClientOSError: {e}") return None, None, errors except aiohttp.client_exceptions.ContentTypeError as e: print(f"ContentTypeError: {e}, response: {response}") errors["ContentTypeError"] += 1 return None, None, errors except aiohttp.client_exceptions.ServerDisconnectedError as e: errors["ServerDisconnectedError"] += 1 print(f"ServerDisconnectedError: {e}") return None, None, errors except Exception as e: print(f"Unknown error {e}") errors["unknown_error"] += 1 return None, None, errors request_end_time = time.time() # Naive HF transformers generation and TensorRT-LLM generation stops at EOS # tokens and the generation may be shorter than the ground-truth output # sequence length. if backend == "naive_transformers": complete_pred = output["predictions"][0][0]["generated_text"] new_text_start_index = complete_pred.find(NEW_TEXT_KEY) + len(NEW_TEXT_KEY) pred = complete_pred[new_text_start_index:] output_token_ids = tokenizer(pred).input_ids output_len = len(output_token_ids) - prompt_len elif backend == "tensorrt_llm_triton": output_token_ids = tokenizer(output["text_output"]).input_ids output_len = len(output_token_ids) elif backend == "sax": output_token_ids = tokenizer(output["choices"][0]["text"]).input_ids output_len = len(output_token_ids) elif backend == "tgi": output_token_ids = tokenizer(output["generated_text"]).input_ids output_len = len(output_token_ids) elif backend == "vllm": output_token_ids = tokenizer(output["choices"][0]["text"]).input_ids output_len = len(output_token_ids) elif backend == "jetstream": output_token_ids = tokenizer(output["response"]).input_ids output_len = len(output_token_ids) # (prompt len, output len, latency, success) request_latency = (prompt_len, output_len, (request_end_time - request_start_time)) request_latency_per_output_token_metric.observe((request_end_time - request_start_time) / output_len) prompt_length_metric.observe(prompt_len) response_length_metric.observe(output_len) return request_latency, None, None async def benchmark( args: argparse.Namespace, api_url: str, tokenizer: PreTrainedTokenizerBase, model: str, ) -> Tuple[List[Tuple[int, int, float]], List[float], Dict[str, int]]: """Runs benchmark with asynchronous requests.""" input_requests = get_filtered_dataset( args.dataset, args.max_input_length, args.max_output_length, tokenizer, args.use_dummy_text, ) benchmark_start_time = time.time() tasks: List[asyncio.Task] = [] prompts_sent: int = 0 async for request in generate_next_request(input_requests, args.request_rate): if args.num_prompts <= prompts_sent: break prompt, prompt_len, output_len = request if args.stream_request: task = asyncio.create_task( send_stream_request( args.backend, api_url, prompt, prompt_len, output_len, args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, model, args.request_timeout, ) ) else: task = asyncio.create_task( send_request( args.backend, api_url, prompt, prompt_len, output_len, args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, model, args.request_timeout, ) ) tasks.append(task) prompts_sent += 1 results = await asyncio.gather(*tasks) combined_latencies = [] combined_ttfts = [] combined_errors = init_errors_map() for latency, ttft, errors in results: if latency: combined_latencies.append(latency) if errors: for err, count in errors.items(): combined_errors[err] = combined_errors[err] + count if ttft: combined_ttfts.append(ttft) benchmark_duration = time.time() - benchmark_start_time print_and_save_result(args, benchmark_duration, prompts_sent, model, combined_latencies, combined_ttfts, combined_errors) return combined_latencies, combined_ttfts, combined_errors def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics, model, errors): # Setup start_dt_proto = Timestamp() start_dt_proto.FromDatetime(args.start_datetime) final_json = { # metrics values are numerical "metrics" : { # Traffic "num_prompts_attempted": benchmark_result['num_prompts_attempted'], "num_prompts_succeeded": benchmark_result['num_prompts_succeeded'], "request_rate": args.request_rate, 'server_metrics': { **server_metrics }, **benchmark_result, **errors, }, # dimensions values are strings "dimensions": { "date": args.start_datetime.strftime('%Y%m%d-%H%M%S'), "backend": args.backend, "model_id": model, "tokenizer_id": args.tokenizer, **(json.loads(args.additional_metadata_metrics_to_save) if args.additional_metadata_metrics_to_save else {}) }, "config": { "model": model, "num_models": len(args.models.split(',')), "model_server": args.backend, "start_time": { "seconds" : start_dt_proto.seconds, "nanos" : start_dt_proto.nanos } }, "summary_stats": { "stats": [{ "request_rate": args.request_rate, "request_latency": { "mean": benchmark_result["avg_latency"], "median": benchmark_result["median_latency"], "sd": benchmark_result["sd_latency"], "min": benchmark_result["min_latency"], "max": benchmark_result["max_latency"], "p90": benchmark_result["p90_latency"], "p99": benchmark_result["p99_latency"], }, "throughput": { "mean": benchmark_result['throughput'] }, "input_length": { "mean": benchmark_result["avg_input_len"], "median": benchmark_result["median_input_len"], "sd": benchmark_result["sd_input_len"], "min": benchmark_result["min_input_len"], "max": benchmark_result["max_input_len"], "p90": benchmark_result["p90_input_len"], "p99": benchmark_result["p99_input_len"], }, "output_length": { "mean": benchmark_result["avg_output_len"], "median": benchmark_result["median_output_len"], "sd": benchmark_result["sd_output_len"], "min": benchmark_result["min_output_len"], "max": benchmark_result["max_output_len"], "p90": benchmark_result["p90_output_len"], "p99": benchmark_result["p99_output_len"], }, "tpot": { "mean": benchmark_result["avg_per_output_token_latency"], "median": benchmark_result["median_per_output_token_latency"], "sd": benchmark_result["sd_per_output_token_latency"], "min": benchmark_result["min_per_output_token_latency"], "max": benchmark_result["max_per_output_token_latency"], "p90": benchmark_result["p90_per_output_token_latency"], "p99": benchmark_result["p99_per_output_token_latency"], }, "model_server_metrics" : [{"Name": name, **metrics} for name, metrics in server_metrics.items()] }] } } # Save to file model_without_slash = model.replace("/","-") file_name = ( f"{args.file_prefix}-{args.backend}-{args.request_rate}qps-{args.start_datetime.strftime('%Y%m%d-%H%M%S')}-{model_without_slash}.json" ) with open(file_name, "w", encoding="utf-8") as outfile: json.dump(final_json, outfile) if gcs_bucket is not None: try: gcs_bucket.blob(f"{args.output_bucket_filepath}/{file_name}").upload_from_filename(file_name) print(f"File {file_name} uploaded to gs://{args.output_bucket}/{args.output_bucket_filepath}") except google.cloud.exceptions.NotFound: print(f"GS Bucket (gs://{args.output_bucket}) does not exist") def metrics_to_scrape(backend: str) -> List[str]: # Each key in the map is a metric, it has a corresponding 'stats' object # It must be populated on the outputs 'metrics' field as 'key':'stats' # If a value is specified for a given key, it will be populated on the outputs `summary_stats.stats` field as 'value':'stats' as well. if backend == "vllm": return [ "vllm:gpu_cache_usage_perc", "vllm:num_requests_waiting", "vllm:num_requests_running", "vllm:num_requests_swapped", "vllm:time_to_first_token_seconds", "vllm:time_per_output_token_seconds", "vllm:request_queue_time_seconds", "vllm:request_inference_time_seconds", "vllm:request_prompt_tokens", "vllm:request_generation_tokens", "vllm:iteration_tokens_total", ] elif backend == "jetstream": return [ "jetstream_slots_used_percentage", "jetstream_prefill_backlog_size", ] else: return [] def print_metrics(metrics: List[str], duration: float, backend: str): # Creates a credentials object from the default service account file # Assumes that script has appropriate default credentials set up, ref: # https://googleapis.dev/python/google-auth/latest/user-guide.html#application-default-credentials credentials, project_id = google.auth.default() # Prepare an authentication request - helps format the request auth token auth_req = google.auth.transport.requests.Request() server_metrics = {} # Request refresh tokens credentials.refresh(auth_req) url='https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus/api/v1/metadata' % (project_id) headers_api = {'Authorization': 'Bearer ' + credentials.token} request_post = requests.get(url=url, headers=headers_api) all_metrics_metadata = request_post.json() if request_post.ok is not True: print("HTTP Error: %s" % (all_metrics_metadata)) if all_metrics_metadata["status"] != "success": print("Metadata error response: %s" % all_metrics_metadata["error"]) for metric in metrics: print("Metric Name: %s" % (metric)) # Find metric type metric_type = all_metrics_metadata['data'][metric] if all_metrics_metadata['data'][metric] is None: print("No metric found for: %s" % metric) return metric_type = metric_type[0]['type'] metric_results = {} # Queries scrape all metrics collected from the last $DURATION seconds from the backend's related # podmonitoring spec assumed to be named "$BACKEND-podmonitoring" queries = { "gauge": { "Mean": "avg_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), "Median": "quantile_over_time(0.5, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), "Sd": "stddev_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), "Min": "min_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), "Max": "max_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), "P90": "quantile_over_time(0.9, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), "P99": "quantile_over_time(0.99, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), }, "histogram": { "Mean": "sum(rate(%s_sum{job='%s-podmonitoring'}[%.0fs])) / sum(rate(%s_count{job='%s-podmonitoring'}[%.0fs]))" % (metric, backend, duration, metric, backend, duration), "Median": "histogram_quantile(0.5, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), "Min": "histogram_quantile(0, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), "Max": "histogram_quantile(1, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), "P90": "histogram_quantile(0.9, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), "P99": "histogram_quantile(0.99, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), } } for query_name, query in queries[metric_type].items(): # Configure respective query url='https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus/api/v1/query' % (project_id) headers_api = {'Authorization': 'Bearer ' + credentials.token} params = {'query': query} print(f"Finding {query_name} {metric} with the following query: {query}") request_post = requests.get(url=url, headers=headers_api, params=params) response = request_post.json() print(f"Got response from metrics server: {response}") # handle response if request_post.ok: if response["status"] == "success": metric_results[query_name] = float(response["data"]["result"][0]["value"][1]) print("%s: %s" % (query_name, response["data"]["result"][0]["value"][1])) else: print("Cloud Monitoring PromQL Error: %s" % (response["error"])) else: print("HTTP Error: %s" % (response)) server_metrics[metric] = metric_results return server_metrics def get_stats_for_set(name, description, points): avg = np.mean(points) if points else 0 median = np.median(points) if points else 0 sd = np.std(points) if points else 0 min = np.min(points) if points else 0 max = np.max(points) if points else 0 p90 = np.percentile(points, 90) if points else 0 p99 = np.percentile(points, 99) if points else 0 print(f"Average {description}:" f" {avg:.2f}") return { f'avg_{name}': avg, f'median_{name}': median, f'sd_{name}': sd, f'min_{name}': min, f'max_{name}': max, f'p90_{name}': p90, f'p99_{name}': p99, } def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_requests, model, request_latencies, ttfts, errors): benchmark_result = {} print(f"====Result for Model: {model}====") print(f"Errors: {errors}") print(f"Total time: {benchmark_duration:.2f} s") print(f"Successful/total requests: {len(request_latencies)}/{total_requests}") print(f"Requests/min: {60 * total_requests / benchmark_duration:.2f}") benchmark_result["num_prompts_attempted"] = total_requests benchmark_result["num_prompts_succeeded"] = len(request_latencies) benchmark_result['benchmark_time'] = benchmark_duration benchmark_result['throughput_rps'] = (args.num_prompts / benchmark_duration) total_output_tokens = np.sum([output_len for _, output_len, _ in request_latencies]) output_tokens_per_second = total_output_tokens / benchmark_duration benchmark_result['throughput'] = output_tokens_per_second output_tokens_per_min = 60 * output_tokens_per_second print(f"Output_tokens/min: {output_tokens_per_min:.2f}") benchmark_result['total_output_token'] = int(total_output_tokens) benchmark_result['output_tokens_per_min'] = output_tokens_per_min total_input_tokens = np.sum([prompt_len for prompt_len, _, _ in request_latencies]) input_tokens_per_min = 60 * total_input_tokens / benchmark_duration print(f"Input_tokens/min: {input_tokens_per_min:.2f}") benchmark_result['total_input_tokens'] = int(total_input_tokens) benchmark_result['input_tokens_per_min'] = input_tokens_per_min total_tokens = total_input_tokens + total_output_tokens tokens_per_min = 60 * total_tokens / benchmark_duration print(f"Tokens/min: {tokens_per_min:.2f}") benchmark_result['total_tokens'] = int(total_tokens) benchmark_result['tokens_per_min'] = tokens_per_min ttft_stats = {} if args.stream_request: ttft_stats = get_stats_for_set("TTFT", "Time to First Token (s)", ttfts) if args.machine_cost: print( "Cost $/1k tokens:" f" {args.machine_cost * 1000 / (60 * output_tokens_per_min)}" ) benchmark_result = { **benchmark_result, **(get_stats_for_set("per_token_latency", "seconds/token (includes waiting time on server)", [ latency / (prompt_len + output_len) for prompt_len, output_len, latency in request_latencies ])), **ttft_stats, # NOTE: The latency below includes requests awaiting time on server side. # It's not comparable with the model inference latency for batch size 1. **(get_stats_for_set("latency", "milliseconds/request (includes waiting time on server)" ,[1000 * latency for _, _, latency in request_latencies])), **(get_stats_for_set("per_output_token_latency", "milliseconds/output_token (includes waiting time on server)", [1000 * latency / output_len for _, output_len, latency in request_latencies])), **(get_stats_for_set("input_len", "input length", [float(prompt_len) for prompt_len, _, _ in request_latencies])), **(get_stats_for_set("output_len", "output length", [float(output_len) for _, output_len, _ in request_latencies])) } server_metrics = {} if args.scrape_server_metrics: server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_duration, args.backend) if args.save_json_results: save_json_results(args, benchmark_result, server_metrics, model, errors) async def main(args: argparse.Namespace): print(args) models = args.models.split(',') print(f"Models to benchmark: {models}") random.seed(args.seed) np.random.seed(args.seed) endpoint = ( "v1/completions" if args.backend == "vllm" else args.endpoint ) # Create GCS client before benchmarking # Should fail fast if client is misconfigured or missing permissions if args.output_bucket is not None: global gcs_client gcs_client = storage.Client() global gcs_bucket gcs_bucket = gcs_client.bucket(args.output_bucket) if args.output_bucket_filepath: blob = gcs_bucket.blob(args.output_bucket_filepath) if not blob.exists(): blob.upload_from_string('') print(f"Starting Prometheus Server on port {PROMETHEUS_PORT}") start_http_server(PROMETHEUS_PORT) api_url = f"http://{args.host}:{args.port}/{endpoint}" tokenizer = AutoTokenizer.from_pretrained( args.tokenizer, trust_remote_code=args.trust_remote_code ) benchmark_start_time = time.time() args.start_datetime = datetime.fromtimestamp(benchmark_start_time) results = await asyncio.gather( *[benchmark(args, api_url, tokenizer, model) for model in models] ) # Summarize results combined_latencies = [] combined_ttfts = [] combined_errors = { "ClientConnectorError": 0, "TimeoutError": 0, "ContentTypeError": 0, "ClientOSError": 0, "unknown_error": 0, "ServerDisconnectedError": 0, } for latencies, ttfts, errors in results: combined_latencies.extend(latencies) combined_ttfts.extend(ttfts) for k, v in errors.items(): combined_errors[k] = combined_errors[k] + v benchmark_duration_all_models = time.time() - benchmark_start_time if args.save_aggregated_result: print_and_save_result(args, benchmark_duration_all_models, len(models)*args.num_prompts, f"ALL-{len(models)}-MODELS", combined_latencies, combined_ttfts, combined_errors) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Benchmark the online serving throughput." ) parser.add_argument( "--backend", type=str, default="vllm", choices=[ "vllm", "tgi", "naive_transformers", "tensorrt_llm_triton", "sax", "jetstream" ], ) parser.add_argument( "--sax_model", type=str, default="", help="Model name to send request to at API server for SAX model server.", ) parser.add_argument("--file-prefix", type=str, default="benchmark") parser.add_argument("--endpoint", type=str, default="generate") parser.add_argument("--host", type=str, default="localhost") parser.add_argument("--port", type=int, default=7080) parser.add_argument("--dataset", type=str, help="Path to the dataset.") parser.add_argument( "--models", type=str, help="Comma separated list of models to benchmark.", ) parser.add_argument( "--stream-request", action="store_true", help="Whether to stream the request. Needed for TTFT metric", ) parser.add_argument( "--request-timeout", type=float, default=(3.0 * 60.0 * 60.0), help="Individual request timeout", ) parser.add_argument( "--tokenizer", type=str, required=True, help="Name or path of the tokenizer.", ) parser.add_argument( "--best-of", type=int, default=1, help="Generates `best_of` sequences per prompt and returns the best one.", ) parser.add_argument("--use-beam-search", action="store_true") parser.add_argument( "--num-prompts", type=int, default=1000, help="Number of prompts to process.", ) parser.add_argument( "--max-input-length", type=int, default=1024, help=( "Maximum number of input tokens for filtering the benchmark dataset." ), ) parser.add_argument( "--max-output-length", type=int, default=1024, help=( "Maximum number of input tokens for filtering the benchmark dataset." ), ) parser.add_argument( "--top-k", type=int, default=32000, help=( "Number of candidate tokens that are considered at each step of the" " generation process. 32000 is the vocab_size of Open-LLaMA and" " LLaMA2 models." ), ) parser.add_argument( "--request-rate", type=float, default=float("inf"), help=( "Number of requests per second. If this is inf, " "then all the requests are sent at time 0. " "Otherwise, we use Poisson process to synthesize " "the request arrival times." ), ) parser.add_argument("--seed", type=int, default=int(time.time())) parser.add_argument( "--trust-remote-code", action="store_true", help="trust remote code from huggingface", ) parser.add_argument( "--machine-cost", type=float, default=None, help="Machine cost per hour including accelerators (if any)", ) parser.add_argument( "--use-dummy-text", action="store_true", help=( "Whether to use dummy text with length defined by max_input_length" " and max_output_length." ), ) parser.add_argument( "--save-json-results", action="store_true", help="Whether to save benchmark results to a json file.", ) parser.add_argument( "--output-bucket", type=str, default=None, help=( "Specifies the Google Cloud Storage bucket to which JSON-format results" " will be uploaded. If not provided, no upload will occur." ) ) parser.add_argument( "--output-bucket-filepath", type=str, default=None, help=( "Specifies the destination path within the bucket provided by" " --output-bucket for uploading the JSON results. This argument requires" " --output-bucket to be set. If not specified, results will be uploaded " " to the root of the bucket. If the filepath doesnt exist, it will be" " created for you." ) ) parser.add_argument( "--save-aggregated-result", action="store_true", help="Whether to aggregate results of all models and save the result.", ) parser.add_argument( "--additional-metadata-metrics-to-save", type=str, help=( "Additional metadata about the workload. Should be a dictionary in" " the form of a string." ), ) parser.add_argument( "--scrape-server-metrics", action="store_true", help="Whether to scrape server metrics.", ) cmd_args = parser.parse_args() asyncio.run(main(cmd_args))