assets/training/model_evaluation/src_distributed/model_prediction.py (425 lines of code) (raw):

# Copyright (c) Microsoft Corporation. # Licensed under the MIT License. """Run script to infer.""" # flake8: noqa: E702 import sys # sys.path.append("/src/") import json import os import yaml import torch import pandas as pd import numpy as np import math import importlib import time from argparse import ArgumentParser from concurrent.futures import ThreadPoolExecutor from local_constants import ArgumentLiterals, ModelPath, TEXT_TOKEN_TASKS, PerformanceColumns, FILTER_MODEL_PREDICTION_PARAMS from local_constants import LLM_FT_PREPROCESS_FILENAME, LLM_FT_CHAT_COMPLETION_KEY, ChatCompletionConstants from itertools import repeat from accelerate import PartialState import torch.distributed as dist from datetime import datetime, timezone from data_utils import read_model_prediction_data, prepare_data, prepare_chat_data_from_ft_pipeline from prepare_data import _clean_and_validate_dataset, validate_and_get_columns from exceptions import PredictException, DataLoaderException, ModelLoadingException from error_definitions import ModelPredictionInternalError, BadModel, BadInputData from llm.optimized.inference.constants import EngineName, TaskType, SupportedTask, ALL_TASKS, VLLMSupportedModels, MIISupportedModels from llm.optimized.inference.fm_score import FMScore from logging_utilities import get_logger, get_azureml_exception, log_traceback, swallow_all_exceptions from llm.optimized.inference.managed_inference import MIRPayload from llm.optimized.inference.model_utils import build_configs_from_model, get_generator_params logger = get_logger(name=__name__) DEVICE_COUNT = torch.cuda.device_count() distributed_state = PartialState() class Predictor: """Predictor class for distributed inference using container.""" def __init__(self, engine, task_type, extra_params, num_replicas, label_column_name, tokenizer, extra_y_test_cols=None) -> None: """Model Predictor. Args: engine (str): _description_ task_type (str): _description_ extra_params (dict): _description_ num_replicas (int): _description_ label_column_name (str): _description_ tokenizer (Tokenizer): _description_ extra_y_test_cols (str, optional): _description_. Defaults to None. """ self.engine = engine self.task_type = task_type self.extra_params = extra_params self.num_replicas = num_replicas self.label_column_name = label_column_name self.tokenizer = tokenizer self.extra_y_test_cols = extra_y_test_cols self._filter_param() def _filter_param(self): if isinstance(self.extra_params, dict): for param in FILTER_MODEL_PREDICTION_PARAMS: if self.extra_params.get(param, None): logger.info(f"Filtering param {param} from extra_params") params_dict = self.extra_params.pop(param) self.extra_params.update(params_dict) logger.info(f"Extra params dict after filtering: {self.extra_params}") def postprocess(self, result): """Post process computed predictions. Args: result (_type_): _description_ Returns: _type_: _description_ """ y_pred_df, y_test_df, perf_df, y_pred_proba_df = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame() for y_pred, y_test, perf, pred_probas in result: logger.info(f"Type here as well: {type(y_test)}") y_pred_df = pd.concat([y_pred_df, y_pred], axis=0) y_test_df = pd.concat([y_test_df, y_test], axis=0) perf_df = pd.concat([perf_df, perf], axis=0) y_pred_proba_df = pd.concat([y_pred_proba_df, pred_probas], axis=0) ground_truth_columns = [self.label_column_name] if self.extra_y_test_cols is not None: ground_truth_columns += self.extra_y_test_cols y_test_df.columns = ground_truth_columns[:] return y_pred_df, y_test_df, perf_df, y_pred_proba_df def predict(self, data): """Predict method for full data. Args: data (_type_): _description_ Returns: _type_: _description_ """ with ThreadPoolExecutor(max_workers=self.num_replicas) as executor: result = list(executor.map( self.predict_single, data, )) return self.postprocess(result) def _make_chat_completion_data(self, input_df, last_chats, col_name): appended_data = {col_name:[]} input_rows = input_df.values.tolist() for ind, datarow in enumerate(input_rows): conversation = datarow[0] updated_conversation = conversation + [{"role":"assistant", "content":last_chats[ind]}] appended_data[col_name].append(updated_conversation) return pd.DataFrame(appended_data) def predict_single(self, data): """Predict single batch. Args: data (_type_): _description_ Raises: exception: _description_ Returns: _type_: _description_ """ X_test, y_test = data try: input_texts = X_test.values.tolist() if isinstance(input_texts[0], list): if self.task_type == SupportedTask.CHAT_COMPLETION: input_data = [] add_generation_prompt = self.extra_params.pop("add_generation_prompt", True) for itext in input_texts: input_data.append(self.tokenizer.apply_chat_template(itext[0], tokenize=False, add_generation_prompt=add_generation_prompt)) input_texts = input_data[:] self.extra_params.update({"return_full_text": False}) payload = MIRPayload(input_texts, self.extra_params, TaskType.CONVERSATIONAL, False) else: input_texts = [i[0] if len(i) == 1 else [j.strip() for j in i] for i in input_texts] if self.task_type == SupportedTask.TEXT_GENERATION: if "return_full_text" not in self.extra_params: self.extra_params["return_full_text"] = False if self.task_type == SupportedTask.QnA: self.extra_params.update({"truncation":"longest_first"}) data = { "input_data": { "input_string": input_texts, "parameters": self.extra_params, } } payload = MIRPayload.from_dict(data) payload.update_params(get_generator_params(payload.params)) try: inference_results = self.engine.run(payload) except: try: logger.info("Failed with longest_first") payload.params["truncation"] = "only_second" inference_results = self.engine.run(payload) except: logger.info("Failed with only first") payload.params["truncation"] = "only_first" inference_results = self.engine.run(payload) logger.info( f"Processing new request with parameters: {payload.params}" ) inference_results = None if self.task_type == SupportedTask.CHAT_COMPLETION: payload.convert_query_to_list() start_ms = time.time() * 1000 inference_results = self.engine.run(payload) end_ms = time.time() * 1000 outputs = [res.response for i, res in enumerate(inference_results)] pred_probas = [res.scores for res in inference_results] else: start_ms = time.time() * 1000 inference_results = self.engine.run(payload) end_ms = time.time() * 1000 if self.task_type == SupportedTask.TEXT_GENERATION: outputs = [] for gt, res in zip(input_texts, inference_results): if gt in res.response: outputs.append(res.response[len(gt):]) else: outputs.append(res.response) else: outputs = [res.response for i, res in enumerate(inference_results)] pred_probas = [res.scores for res in inference_results] perf_data = [{ PerformanceColumns.BATCH_SIZE_COLUMN_NAME: len(input_texts), PerformanceColumns.START_TIME_COLUMN_NAME: datetime.fromtimestamp(start_ms / 1000, timezone.utc).isoformat(), PerformanceColumns.END_TIME_COLUMN_NAME: datetime.fromtimestamp(end_ms / 1000, timezone.utc).isoformat(), PerformanceColumns.LATENCY_COLUMN_NAME: end_ms - start_ms, PerformanceColumns.OUTPUT_TOKENS_COLUMN_NAME: len(self.tokenizer(pred)) if self.tokenizer is not None else 0, PerformanceColumns.OUTPUT_CHARACTERS_COLUMN_NAME: len(pred) if isinstance(pred, str) else 1, PerformanceColumns.INPUT_CHARACTERS_COLUMN_NAME: len(gt) if isinstance(gt, str) else 1, PerformanceColumns.INPUT_TOKENS_COLUMN_NAME: len(self.tokenizer(gt)) if self.tokenizer is not None else 0 } for gt, pred in zip(input_texts, outputs)] pred_proba_df = pd.DataFrame(pred_probas, index=X_test.index) perf_data = pd.DataFrame(perf_data) if self.task_type == SupportedTask.CHAT_COMPLETION or self.task_type == TaskType.CONVERSATIONAL: pred_df = self._make_chat_completion_data(X_test.copy(deep=True), outputs, col_name=ChatCompletionConstants.OUTPUT_FULL_CONVERSATION) pred_df[ChatCompletionConstants.OUTPUT] = outputs y_test = pd.DataFrame(y_test, columns=["ground_truth"], index=X_test.index) # y_test = self._make_chat_completion_data(X_test.copy(deep=True), y_test, col_name="ground_truth") return pred_df, y_test, perf_data, pred_proba_df pred_df = pd.DataFrame(outputs, index=X_test.index, columns=["prediction"]) if isinstance(y_test, pd.Series): y_test = y_test.to_frame() elif isinstance(y_test, np.ndarray) or isinstance(y_test, list): y_test = pd.DataFrame(y_test, index=X_test.index) return pred_df, y_test, perf_data, pred_proba_df except Exception as e: exception = get_azureml_exception(PredictException, ModelPredictionInternalError, e, wrap_azureml_ex=False, error=repr(e)) log_traceback(exception, logger) raise exception def _init_cuda_visible_devices(): import torch if "CUDA_VISIBLE_DEVICES" in os.environ: return if ( "NVIDIA_VISIBLE_DEVICES" in os.environ and os.environ["NVIDIA_VISIBLE_DEVICES"] != "all" ): # map the gpu ids to integers gpu_ids = os.environ["NVIDIA_VISIBLE_DEVICES"].split(",") gpu_ids = [str(i) for i in range(len(gpu_ids)) if gpu_ids[i] != "-1"] elif torch.cuda.is_available(): gpu_ids = [str(i) for i in range(torch.cuda.device_count())] else: # if no GPU is available, don't set anything return os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(gpu_ids) def get_model_size(model_path): """Estimate size of model. Args: model_path (_type_): _description_ Returns: _type_: _description_ """ size = 0 # get size for path, dirs, files in os.walk(model_path): for f in files: fp = os.path.join(path, f) size += os.path.getsize(fp) size /= (pow(1024, 3)) return size def get_best_engine(config_path, model_path): """Fetch best engine for model from architecture. Args: config_path (_type_): _description_ model_path (_type_): _description_ Returns: _type_: _description_ """ with open(config_path) as f: model_config = json.load(f) model_class = model_config["architectures"][0] folder_size = get_model_size(model_path) dtype = model_config.get("torch_dtype", "float32") if "float16" in dtype: model_size = folder_size//2 else: model_size = folder_size//4 best_engine = EngineName.HF if model_class in VLLMSupportedModels.Models: best_engine = EngineName.VLLM # TODO: Add logic for selecting MII Over VLLM using model size elif model_class in MIISupportedModels.Models: best_engine = EngineName.MII return best_engine def load_data(task, test_data, label_column_name, input_column_names, extra_y_test_cols, batch_size): """Load input data. Args: task (_type_): _description_ test_data (_type_): _description_ label_column_name (_type_): _description_ input_column_names (_type_): _description_ extra_y_test_cols (_type_): _description_ batch_size (_type_): _description_ Returns: _type_: _description_ """ all_cols = list(input_column_names) if label_column_name is not None: all_cols += [label_column_name] if extra_y_test_cols is not None: all_cols += extra_y_test_cols data = read_model_prediction_data(file_path=test_data, batch_size=batch_size) if task == SupportedTask.CHAT_COMPLETION and os.path.isdir(test_data) and LLM_FT_PREPROCESS_FILENAME in os.listdir(test_data): logger.info(f"Run from Finetune Pipeline. Fetching chat completion data from {test_data}") data = map(prepare_chat_data_from_ft_pipeline, data) return data data = map(_clean_and_validate_dataset, data, repeat(all_cols), repeat(batch_size)) data = map(prepare_data, data, repeat(task), repeat(label_column_name), repeat(False), repeat(extra_y_test_cols)) return data def _gather_predictions_deprecated(all_preds): preds, ground_truth, perf = [], [], [] for res in all_preds: for ind, i in enumerate(res["index"]): preds.append((i, res["predictions"][ind])) ground_truth.append((i, res["ground_truth"][ind])) preds.sort(key=lambda x: x[0]) ground_truth.sort(key=lambda x: x[0]) return [j for i, j in preds], [j for i, j in ground_truth] def _gather_predictions(all_preds): preds, ground_truth, perf, pred_probas = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame() for res in all_preds: preds = pd.concat([preds, res["predictions"]], axis=0) ground_truth = pd.concat([ground_truth, res["ground_truth"]], axis=0) perf = pd.concat([perf, res["perf"]], axis=0) pred_probas = pd.concat([pred_probas, res["pred_probas"]], axis=0) preds.sort_index(inplace=True) ground_truth.sort_index(inplace=True) perf.sort_index(inplace=True) pred_probas.sort_index(inplace=True) return preds, ground_truth, perf, pred_probas def get_smart_defaults(model_path: str): """Compute tensor parallel and num_replicas from model and GPUs available. Args: model_path (str): _description_ Raises: ValueError Returns: _type_: Tuple(int, int) """ model_size_in_gb = get_model_size(model_path) avg_gpu_free_mem = sum([torch.cuda.mem_get_info(i)[0] for i in range(DEVICE_COUNT)])/DEVICE_COUNT avg_gpu_free_mem = avg_gpu_free_mem / pow(1024, 3) # Bytes to GBs logger.info(f"Got Model Size {model_size_in_gb} and average GPU memory per Device: {avg_gpu_free_mem}") num_possible_replicas = int(DEVICE_COUNT / math.ceil((model_size_in_gb / 0.8) / avg_gpu_free_mem)) if num_possible_replicas == 0: logger.debug( "Tensor parallel / model replica calculation with extra memory for cache " "results in 0 replicas. Calculating without extra memory for cache.", ) num_possible_replicas = int(DEVICE_COUNT / math.ceil((model_size_in_gb) / avg_gpu_free_mem)) if num_possible_replicas == 0: raise ValueError("Not enough GPU to support model. Please select bigger SKU.") tensor_parallel = DEVICE_COUNT//num_possible_replicas return tensor_parallel, num_possible_replicas def load_tokenizer(tokenizer_path, tokenizer_class, **tokenizer_load_kwargs): """Load model's tokenizer. Args: tokenizer_path (str): _description_ tokenizer_class (str): _description_ Raises: exception: ModelLoadingException Returns: _type_: Tokenizer """ module_name = "transformers" try: model_module = importlib.import_module(module_name) object_class = getattr(model_module, tokenizer_class) except (AttributeError, ImportError) as exc: exception = get_azureml_exception(ModelLoadingException, ModelPredictionInternalError, exc, wrap_azureml_ex=False, error=repr(exc)) log_traceback(exception, logger) raise exception tokenizer = object_class.from_pretrained(tokenizer_path, **tokenizer_load_kwargs) return tokenizer def strtobool(val): """Convert a string representation of truth to true (1) or false (0). True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if 'val' is anything else. """ val = val.lower() if val in {"y", "yes", "t", "true", "on", "1"}: return 1 if val in {"n", "no", "f", "false", "off", "0"}: return 0 raise ValueError(f"invalid truth value {val!r}") def is_fsdp_enabled(): """Torch Fully Sharded Data Parallel enabled check.""" return ( torch.distributed.is_available() and torch.distributed.is_initialized() and strtobool(os.environ.get("ACCELERATE_USE_FSDP", "False")) == 1 and strtobool(os.environ.get("FSDP_CPU_RAM_EFFICIENT_LOADING", "False")) == 1 ) @swallow_all_exceptions(logger) def main(): """Initialize text-generation-inference server and client.""" extra_params = {} logger.info("Init Start.") parser = ArgumentParser() # Inputs parser.add_argument("--mlflow_model", type=str, dest="mlflow_model", required=True) parser.add_argument("--parameters", type=str, dest="parameters", required=False, default="{}") parser.add_argument("--task", type=str, dest=ArgumentLiterals.TASK, required=True, choices=TEXT_TOKEN_TASKS) parser.add_argument("--data", type=str, dest=ArgumentLiterals.DATA, required=True) parser.add_argument("--label-column-name", type=lambda x: x.split(","), dest=ArgumentLiterals.LABEL_COLUMN_NAME, required=False, default=None) parser.add_argument("--input-column-names", type=lambda x: [i.strip() for i in x.split(",") if i and not i.isspace()], dest=ArgumentLiterals.INPUT_COLUMN_NAMES, required=False, default=None) parser.add_argument("--batch-size", type=int, dest=ArgumentLiterals.BATCH_SIZE, required=False, default=None) parser.add_argument("--predictions", type=str, dest=ArgumentLiterals.PREDICTIONS, required=True) parser.add_argument("--ground-truth", type=str, dest=ArgumentLiterals.GROUND_TRUTHS, required=True) parser.add_argument("--performance-metadata", type=str, dest=ArgumentLiterals.PERFORMANCE_METADATA, required=False, default=None) parser.add_argument("--prediction-probabilities", type=str, dest=ArgumentLiterals.PREDICTION_PROBABILITIES, required=False, default=None) args, unknown_args = parser.parse_known_args() logger.info(f"Distributed Type: {distributed_state.distributed_type}") try: tensor_parallel, num_replicas = get_smart_defaults(args.mlflow_model) except Exception as e: exception = get_azureml_exception(ModelLoadingException, ModelPredictionInternalError, e, wrap_azureml_ex=False, error=repr(e)) log_traceback(exception, logger) raise exception logger.info(f"Setting Num Replicas to: {num_replicas} and Tensor Parallel to {tensor_parallel}") os.environ["NUM_REPLICAS"] = str(num_replicas) os.environ["TENSOR_PARALLEL"] = str(tensor_parallel) data_path = args.data logger.info(f"Torch Current Device Count:{torch.cuda.device_count()}") logger.info(f"Got Params: {args.parameters}") extra_params.update(json.loads(args.parameters)) logger.info(f"Got Model Path: {args.mlflow_model}") task_type = args.task input_column_names, label_column_name, extra_y_test_cols = validate_and_get_columns(vars(args)) try: _init_cuda_visible_devices() abs_mlmodel_path = os.path.join( args.mlflow_model, ModelPath.MLMODEL_PATH ) mlmodel = {} if abs_mlmodel_path and os.path.exists(abs_mlmodel_path): with open(abs_mlmodel_path) as f: mlmodel = yaml.safe_load(f) if os.path.exists(os.path.join(args.mlflow_model, ModelPath.DEFAULT_MLFLOW_MODEL_PATH)): model_path = os.path.join( args.mlflow_model, ModelPath.DEFAULT_MLFLOW_MODEL_PATH, ) config_path = os.path.join(model_path, "config.json") tokenizer_path = os.path.join( args.mlflow_model, ModelPath.DEFAULT_TOKENIZER_PATH ) else: model_path = os.path.join(args.mlflow_model, ModelPath.DEPRECATED_MLFLOW_MODEL_PATH) config_path = os.path.join( args.mlflow_model, ModelPath.DEPRECATED_MLFLOW_CONFIG_PATH, "config.json" ) if not os.path.exists(config_path): config_path = os.path.join(model_path, "config.json") tokenizer_path = os.path.join( args.mlflow_model, ModelPath.DEPRECATED_MLFLOW_TOKENIZER_PATH ) if not os.path.exists(tokenizer_path): tokenizer_path = model_path inference_config = None if os.path.exists(os.path.join(args.mlflow_model, ModelPath.INFERENCE_CONFIG_PATH)): inference_config = os.path.join(args.mlflow_model, ModelPath.INFERENCE_CONFIG_PATH) engine_config, task_config, default_generator_configs, task_type, model_info = build_configs_from_model( mlmodel, model_path, config_path, tokenizer_path, inference_config ) config = { "engine": engine_config, "task": task_config, } enable_character_counts, enable_token_counts = False, False if extra_params.get("token_count_per_sample", False): enable_token_counts = True extra_params.pop("token_count_per_sample") if extra_params.get("char_count_per_sample", False): enable_character_counts = True extra_params.pop("char_count_per_sample") tokenizer = None if (task_type in TEXT_TOKEN_TASKS and enable_token_counts) or (task_type == SupportedTask.CHAT_COMPLETION or task_type == TaskType.CONVERSATIONAL): tokenizer = load_tokenizer(engine_config["tokenizer"], engine_config["ml_model_info"].get("hf_tokenizer_class", "AutoTokenizer")) g_fmscorer = FMScore(config) g_fmscorer.init() if os.environ.get("LOGGING_WORKER_ID", "") == str(os.getpid()): for k, v in os.environ.items(): logger.info(f"env: {k} = {v}") logger.info( f"updated default_generator_configs: " f"{default_generator_configs}" ) except Exception as e: exception = get_azureml_exception(ModelLoadingException, BadModel, e, error=repr(e)) log_traceback(exception, logger) raise exception try: data = load_data(task_type, data_path, label_column_name, input_column_names, extra_y_test_cols, args.batch_size) except Exception as e: exception = get_azureml_exception(DataLoaderException, BadInputData, e, error=repr(e)) log_traceback(exception, logger) raise exception full_data = [(x, y) for x, y in data] logger.info(f"Dataset size: {len(full_data)}") predictor = Predictor(g_fmscorer, task_type, extra_params, num_replicas, label_column_name, tokenizer, extra_y_test_cols) collated_res = [{} for i in range(distributed_state.num_processes)] with distributed_state.split_between_processes(full_data) as proc_data: y_pred_proc, y_test_proc, y_perf_proc, y_pred_proba = predictor.predict(proc_data) proc_res = {"predictions": y_pred_proc, "ground_truth": y_test_proc, "perf": y_perf_proc, "pred_probas": y_pred_proba} dist.all_gather_object(object_list=collated_res, obj=proc_res) logger.info("Waiting for all processes.....") distributed_state.wait_for_everyone() logger.info(f"Collated Results Lengths: {[len(i) for i in collated_res]}") y_pred_df, y_test_df, y_perf_df, y_pred_proba_df = _gather_predictions(collated_res) if task_type != SupportedTask.CHAT_COMPLETION and task_type != TaskType.CONVERSATIONAL: y_pred_df.columns = ["predictions"] ground_truth_columns = [label_column_name] if extra_y_test_cols is not None: ground_truth_columns += extra_y_test_cols y_test_df.columns = ground_truth_columns[:] if distributed_state.is_main_process: y_pred_df.to_json(args.predictions, orient="records", lines=True) y_test_df.to_json(args.ground_truths, orient="records", lines=True) y_perf_df.to_json(args.performance_metadata, orient="records", lines=True) y_pred_proba_df.to_json(args.prediction_probabilities, orient="records", lines=True) return if __name__ == "__main__": main()