optimum/intel/neural_compressor/quantization.py (277 lines of code) (raw):

# Copyright 2022 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import inspect import logging import types import warnings from enum import Enum from pathlib import Path from typing import Callable, Optional, Union import torch from datasets import Dataset, load_dataset from huggingface_hub.constants import HUGGINGFACE_HUB_CACHE from neural_compressor.config import PostTrainingQuantConfig from neural_compressor.model.torch_model import IPEXModel, PyTorchModel from neural_compressor.quantization import fit from neural_compressor.transformers import GPTQConfig, RtnConfig from neural_compressor.transformers.quantization import convert_to_quantized_model, save_low_bit from torch.utils.data import DataLoader, RandomSampler from transformers import ( DataCollator, PretrainedConfig, PreTrainedModel, default_data_collator, ) from optimum.exporters import TasksManager from optimum.quantization_base import OptimumQuantizer from ..utils.constant import _TASK_ALIASES, WEIGHTS_NAME from ..utils.import_utils import ( _ipex_version, _neural_compressor_version, is_ipex_version, is_neural_compressor_version, ) from .configuration import INCConfig from .utils import ( IPEX_MINIMUM_VERSION, NEURAL_COMPRESSOR_MINIMUM_VERSION, INCDataLoader, ) logger = logging.getLogger(__name__) if is_neural_compressor_version("<", NEURAL_COMPRESSOR_MINIMUM_VERSION): raise ImportError( f"Found an incompatible version of neural-compressor. Found version {_neural_compressor_version}, " f"but only version {NEURAL_COMPRESSOR_MINIMUM_VERSION} or higher is supported." ) class INCQuantizationMode(Enum): DYNAMIC = "post_training_dynamic_quant" STATIC = "post_training_static_quant" AWARE_TRAINING = "quant_aware_training" WEIGHT_ONLY = "post_training_weight_only" SUPPORTED_QUANT_MODE = {approach.value for approach in INCQuantizationMode} class INCQuantizer(OptimumQuantizer): """ Handle the Neural Compressor quantization process. """ def __init__( self, model: Union[PreTrainedModel, torch.nn.Module], eval_fn: Optional[Callable[[PreTrainedModel], int]] = None, calibration_fn: Optional[Callable[[PreTrainedModel], int]] = None, task: Optional[str] = None, seed: int = 42, ): """ Args: model (`torch.nn.Module`): The model to quantize. eval_fn (`Callable[[PreTrainedModel], int]`, defaults to None): The evaluation function to use for the accuracy driven strategy of the quantization process. The accuracy driven strategy will be enabled only if `eval_fn` is provided. task (`str`, defaults to None): The task defining the model topology. Will try to infer it from model if not provided. seed (`int`, defaults to 42): The random seed to use when shuffling the calibration dataset. """ super().__init__() self._original_model = model self.eval_fn = eval_fn if eval_fn is not None else lambda model: 1 self.calibration_fn = calibration_fn self.task = task self.seed = seed signature = inspect.signature(self._original_model.forward) self._signature_columns = list(signature.parameters.keys()) self.input_names = None self._quantized_model = None @classmethod def from_pretrained(cls, model: PreTrainedModel, **kwargs): # TODO : Create model return cls(model, **kwargs) def quantize( self, quantization_config: Union["PostTrainingQuantConfig"], save_directory: Union[str, Path], calibration_dataset: Dataset = None, batch_size: int = 8, data_collator: Optional[DataCollator] = None, remove_unused_columns: bool = True, file_name: str = None, **kwargs, ): """ Quantize a model given the optimization specifications defined in `quantization_config`. Args: quantization_config (`Union[PostTrainingQuantConfig]`): The configuration containing the parameters related to quantization. save_directory (`Union[str, Path]`): The directory where the quantized model should be saved. calibration_dataset (`datasets.Dataset`, defaults to `None`): The dataset to use for the calibration step, needed for post-training static quantization. batch_size (`int`, defaults to 8): The number of calibration samples to load per batch. data_collator (`DataCollator`, defaults to `None`): The function to use to form a batch from a list of elements of the calibration dataset. remove_unused_columns (`bool`, defaults to `True`): Whether or not to remove the columns unused by the model forward method. """ save_directory = Path(save_directory) save_directory.mkdir(parents=True, exist_ok=True) calibration_dataloader = None default_name = WEIGHTS_NAME self._set_task() if kwargs.pop("weight_only", None) is not None: logger.warning( "`weight_only` is deprecated. Use `quantization_config` instead to specify which methodology and quantization pamraters to apply." ) if ( isinstance(quantization_config, PostTrainingQuantConfig) and quantization_config.backend == "ipex" and is_ipex_version("<", IPEX_MINIMUM_VERSION) and "generation" in self.task ): raise ImportError( f"Found an incompatible version of intel-extension-for-pytorch. Found version {_ipex_version}, " f"but only version {IPEX_MINIMUM_VERSION} or higher is supported." ) if INCQuantizationMode(quantization_config.approach) == INCQuantizationMode.STATIC: # Since PyTorch fx trace does not really require an example_inputs, only need calibration_dataset or calibration_fn here. if calibration_dataset is None and self.calibration_fn is None: raise ValueError( "Post-training static quantization needs a calibration dataset or a calibration_function." ) if calibration_dataset is not None: quantization_config.calibration_sampling_size = len(calibration_dataset) calibration_dataloader = self._get_calibration_dataloader( calibration_dataset=calibration_dataset, batch_size=batch_size, remove_unused_columns=remove_unused_columns, data_collator=data_collator, ) if isinstance(self._original_model.config, PretrainedConfig): self._original_model.config.backend = quantization_config.backend compressed_model = fit( self._original_model, conf=quantization_config, calib_dataloader=calibration_dataloader, eval_func=self.eval_fn, calib_func=self.calibration_fn, ) if not hasattr(compressed_model, "_model") or compressed_model._model is None: raise RuntimeError("Calling `neural_compressor.fit` returned unexpected results") if isinstance(self._original_model.config, PretrainedConfig): # If backend is IPEX, then the quantized model is JIT model which will drop the config attribute, # so need set config from original_model. model_config = copy.deepcopy(self._original_model.config) model_config.torch_dtype = "int8" if isinstance(compressed_model, IPEXModel): model_config.torchscript = True model_config.backend = "ipex" model_config.save_pretrained(save_directory) self._quantized_model = compressed_model._model output_path = save_directory.joinpath(file_name or default_name) # Save the quantized model self._save_pretrained(compressed_model, output_path) quantization_config = INCConfig(quantization=quantization_config) quantization_config.save_pretrained(save_directory) @staticmethod def _save_pretrained(model: Union[PyTorchModel, IPEXModel], output_path: str): if isinstance(model, IPEXModel): model._model.save(output_path) else: state_dict = model._model.state_dict() if hasattr(model, "q_config"): state_dict["best_configure"] = model.q_config torch.save(state_dict, output_path) logger.info(f"Model weights saved to {output_path}") def _set_task(self): if self.task is None: try: # using the actual model has better chances of success # since using the model path does not work with local models self.task = TasksManager.infer_task_from_model(self._original_model) except Exception as e: self.task = "default" logger.warning( f"The task could not be automatically inferred and will be set to {self.task}. " f"Please provide the task argument with the relevant task from {', '.join(TasksManager.get_all_tasks())}. Detailed error: {e}" ) self.task = _TASK_ALIASES.get(self.task, self.task) if self.task == "text2text-generation": raise ValueError("Seq2Seq models are currently not supported for post-training static quantization.") def get_calibration_dataset( self, dataset_name: str, num_samples: int = 100, dataset_config_name: Optional[str] = None, dataset_split: str = "train", preprocess_function: Optional[Callable] = None, preprocess_batch: bool = True, use_auth_token: Optional[Union[bool, str]] = None, token: Optional[Union[bool, str]] = None, ) -> Dataset: """ Create the calibration `datasets.Dataset` to use for the post-training static quantization calibration step. Args: dataset_name (`str`): The dataset repository name on the Hugging Face Hub or path to a local directory containing data files in generic formats and optionally a dataset script, if it requires some code to read the data files. num_samples (`int`, defaults to 100): The maximum number of samples composing the calibration dataset. dataset_config_name (`str`, *optional*): The name of the dataset configuration. dataset_split (`str`, defaults to `"train"`): Which split of the dataset to use to perform the calibration step. preprocess_function (`Callable`, *optional*): Processing function to apply to each example after loading dataset. preprocess_batch (`bool`, defaults to `True`): Whether the `preprocess_function` should be batched. use_auth_token (Optional[Union[bool, str]], defaults to `None`): Deprecated. Please use `token` instead. token (Optional[Union[bool, str]], defaults to `None`): The token to use as HTTP bearer authorization for remote files. If `True`, will use the token generated when running `huggingface-cli login` (stored in `~/.huggingface`). Returns: The calibration `datasets.Dataset` to use for the post-training static quantization calibration step. """ if use_auth_token is not None: warnings.warn( "The `use_auth_token` argument is deprecated and will be removed soon. Please use the `token` argument instead.", FutureWarning, ) if token is not None: raise ValueError("You cannot use both `use_auth_token` and `token` arguments at the same time.") token = use_auth_token calibration_dataset = load_dataset( dataset_name, name=dataset_config_name, split=dataset_split, token=token, ) if num_samples is not None: num_samples = min(num_samples, len(calibration_dataset)) calibration_dataset = calibration_dataset.shuffle(seed=self.seed).select(range(num_samples)) if preprocess_function is not None: calibration_dataset = calibration_dataset.map(preprocess_function, batched=preprocess_batch) return calibration_dataset def _get_calibration_dataloader( self, calibration_dataset: Dataset, batch_size: int, remove_unused_columns: bool, data_collator: Optional[DataCollator] = None, use_label: Optional[bool] = True, ) -> INCDataLoader: data_collator = data_collator if data_collator is not None else default_data_collator if remove_unused_columns: calibration_dataset = self._remove_unused_columns(calibration_dataset) self.input_names = getattr(calibration_dataset, "column_names", None) generator = torch.Generator() generator.manual_seed(self.seed) sampler = RandomSampler(calibration_dataset, generator=generator) calibration_dataloader = DataLoader( calibration_dataset, batch_size=batch_size, sampler=sampler, collate_fn=data_collator, drop_last=False, ) return INCDataLoader.from_pytorch_dataloader(calibration_dataloader, use_label) def _remove_unused_columns(self, dataset: Dataset): ignored_columns = list(set(dataset.column_names) - set(self._signature_columns)) return dataset.remove_columns(ignored_columns) def _weight_only_quantization( model_class, model_id: Union[str, Path], quantization_config: Union[RtnConfig, GPTQConfig], token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, local_files_only: bool = False, subfolder: str = "", trust_remote_code: bool = False, **kwargs, ): device_map = kwargs.get("device_map", None) if device_map is None: device_map = "xpu" if (hasattr(torch, "xpu") and torch.xpu.is_available()) else "cpu" else: device_map = device_map.type if isinstance(device_map, torch.device) else device_map use_xpu = device_map == torch.device("xpu") or device_map == "xpu" if use_xpu and (not hasattr(torch, "xpu") or not torch.xpu.is_available()): raise AssertionError("There is no xpu device in this system!") if is_neural_compressor_version("<=", "3.0"): raise AssertionError("Please install neural_compressor > v3.0") if is_ipex_version("<", "2.3.1") and use_xpu: raise AssertionError("Please install intel_extension_for_pytorch >= v2.3.1.") loading_kwargs = { "subfolder": subfolder, "revision": revision, "cache_dir": cache_dir, "token": token, "local_files_only": local_files_only, "force_download": force_download, "trust_remote_code": trust_remote_code, } low_cpu_mem_usage = True if getattr(quantization_config, "use_layer_wise", False): if is_neural_compressor_version(">=", "3.2"): from neural_compressor.torch import load_empty_model model = load_empty_model(model_id, cls=model_class, **loading_kwargs) else: raise ValueError("INC version must be >= 3.2 when use_layer_wise is set to True in quantization_config.") else: model = model_class.from_pretrained(model_id, low_cpu_mem_usage=low_cpu_mem_usage, **loading_kwargs) if use_xpu: quantization_config.update(**{"device": "xpu"}) quantization_config.post_init_xpu() else: quantization_config.post_init_cpu() model.config.update({"low_cpu_mem_usage": low_cpu_mem_usage}) model.eval() if (not torch.cuda.is_available() or device_map == "cpu") and model.config.model_type == "chatglm": model = model.float() model = convert_to_quantized_model(model, quantization_config, device=device_map) quantization_config.remove_redundant_parameters() model.config.quantization_config = quantization_config # add quantization_config and save_low_bit to pretrained model dynamically model.device_map = device_map model.quantization_config = quantization_config model.save_pretrained = types.MethodType(save_low_bit, model) return model