optimum/intel/openvino/quantization.py (1,230 lines of code) (raw):
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import inspect
import logging
import os
from collections import UserDict, deque
from contextlib import contextmanager
from io import BytesIO
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
import datasets
import nncf
import numpy as np
import openvino
import requests
import torch
import transformers
from huggingface_hub.constants import HUGGINGFACE_HUB_CACHE
from nncf.quantization.advanced_parameters import OverflowFix
from nncf.torch import register_module
from nncf.torch.initialization import PTInitializingDataLoader
from openvino import Core, Tensor
from openvino._offline_transformations import compress_quantize_weights_transformation
from PIL import Image
from torch.utils._pytree import tree_map
from torch.utils.data import DataLoader, RandomSampler
from tqdm import tqdm
from transformers import AutoProcessor, AutoTokenizer, DataCollator, PreTrainedModel, default_data_collator
from transformers.pytorch_utils import Conv1D
from transformers.utils import is_accelerate_available
from optimum.exporters.tasks import TasksManager
from optimum.quantization_base import OptimumQuantizer
from ...exporters.openvino import export, export_pytorch_via_onnx
from ...exporters.openvino.model_patcher import patch_model_with_bettertransformer
from ...exporters.openvino.stateful import ensure_export_task_support_stateful, ensure_stateful_is_available
from ..utils.constant import _TASK_ALIASES
from ..utils.import_utils import (
DATASETS_IMPORT_ERROR,
_nncf_version,
is_datasets_available,
is_datasets_version,
is_diffusers_available,
)
from ..utils.modeling_utils import get_model_device
from .configuration import (
OVConfig,
OVMixedQuantizationConfig,
OVPipelineQuantizationConfig,
OVQuantizationConfig,
OVQuantizationConfigBase,
OVQuantizationMethod,
OVWeightQuantizationConfig,
)
from .utils import (
MAX_ONNX_OPSET,
MIN_ONNX_QDQ_OPSET,
ONNX_WEIGHTS_NAME,
OV_XML_FILE_NAME,
PREDEFINED_LANGUAGE_DATASETS,
PREDEFINED_SD_DATASETS,
PREDEFINED_SPEECH_TO_TEXT_DATASETS,
PREDEFINED_TEXT_IMAGE_ENCODER_DATASETS,
PREDEFINED_VISUAL_LM_DATASETS,
)
if is_datasets_available():
from datasets import Dataset
register_module(ignored_algorithms=[])(Conv1D)
core = Core()
logger = logging.getLogger(__name__)
class OVCalibrationDataset(UserDict):
"""
A class to store calibration datasets for quantization with NNCF. Contains an instance of `nncf.Dataset` for each
pipeline model component. For example, for a sequence-to-sequence pipeline with `encoder_model` and `decoder_model`
components, the dictionary should contain two keys: `encoder_model` and `decoder_model`.
"""
def __init__(self, calibration_dataset: Union[nncf.Dataset, Dict[str, nncf.Dataset]]):
"""
Args:
calibration_dataset (`Union[nncf.Dataset, Dict[str, nncf.Dataset]]`):
The calibration dataset to store. Can be a single `nncf.Dataset` instance or a dictionary containing
`nncf.Dataset` instances for each model component. In the first case it is assumed that the dataset
corresponds to a pipeline component named "model".
"""
if isinstance(calibration_dataset, nncf.Dataset):
calibration_dataset = {"model": calibration_dataset}
super().__init__(calibration_dataset)
def __getattr__(self, item: str):
try:
return self.data[item]
except KeyError:
raise AttributeError
class OVDataLoader(PTInitializingDataLoader):
def get_inputs(self, dataloader_output) -> Tuple[Tuple, Dict]:
return (), dataloader_output
@property
def batch_size(self):
batch_size = self._data_loader.batch_size
if is_accelerate_available():
from accelerate.data_loader import DataLoaderStateMixin
if batch_size is None and isinstance(self._data_loader, DataLoaderStateMixin):
batch_size = self._data_loader.total_batch_size
return batch_size
class InferRequestWrapper:
"""
Wrapper class for OV InferRequest or CompiledModel objects that collects inputs which they were called with to
a list.
"""
def __init__(
self,
request: Union[openvino.InferRequest, openvino.CompiledModel],
collected_inputs: List = None,
apply_caching: bool = False,
inference_result_mock: Any = None,
):
"""
Args:
request (`Union[openvino.InferRequest, openvino.CompiledModel]`):
Infer request instance to wrap. May also be an instance of CompiledModel.
collected_inputs (`List`, *optional*):
List where collected inputs will be stored. If None, an empty list will be created
at self.collected_inputs.
apply_caching (`bool`, defaults to False):
Whether to apply data caching. May improve memory footprint, but results in slight performance overhead
due to tensor hash computation.
inference_result_mock (`Any`, *optional*):
If provided, the target request won't be executed and this value will be returned instead resulting in
faster inputs collection. This is useful when the actual model inference can be skipped, and it
should not be provided for models which depend on previous inference results, e.g. encoder-decoder pipelines.
"""
self.request = request
self.collected_inputs = [] if collected_inputs is None else collected_inputs
self.apply_caching = apply_caching
self.inference_result_mock = inference_result_mock
self.tensor_cache = {}
def collect_inputs(self, inputs):
if not self.apply_caching or not isinstance(inputs, dict):
self.collected_inputs.append(copy.deepcopy(inputs))
return
copied_inputs = {}
for k, v in inputs.items():
data = v
if isinstance(data, openvino.Tensor):
data = data.data
if isinstance(data, torch.Tensor):
data = data.cpu().numpy()
data_hash = hash(data.tobytes())
# Avoid data copying if tensor contains data encountered earlier
self.tensor_cache.setdefault(k, {})
if data_hash not in self.tensor_cache[k]:
self.tensor_cache[k][data_hash] = copy.deepcopy(v)
copied_inputs[k] = self.tensor_cache[k][data_hash]
self.collected_inputs.append(copied_inputs)
def __call__(self, *args, **kwargs):
# If __call__ is invoked then self.request must be an instance of CompiledModel
signature = inspect.signature(self.request)
bound_args = signature.bind(*args, **kwargs).arguments
self.collect_inputs(bound_args["inputs"])
if self.inference_result_mock is None:
return self.request(*args, **kwargs)
return self.inference_result_mock
def infer(self, inputs: Any = None, share_inputs: bool = False):
self.collect_inputs(inputs)
if self.inference_result_mock is None:
return self.request.infer(inputs, share_inputs)
return self.inference_result_mock
def start_async(
self,
inputs: Any = None,
userdata: Any = None,
share_inputs: bool = False,
*,
shared_memory: Any = None,
):
self.collect_inputs(inputs)
if self.inference_result_mock is None:
self.request.infer(inputs, share_inputs, share_outputs=True)
return self.inference_result_mock
def wait(self):
pass
def get_tensor(self, name: str):
return Tensor(self.request.results[name])
def __getattr__(self, attr):
if attr in self.__dict__:
return getattr(self, attr)
return getattr(self.request, attr)
class OVCalibrationDatasetBuilder:
"""
A class to build calibration datasets for quantization with NNCF.
Allows to build a calibration dataset from:
- a `datasets.Dataset` object;
- a name of the dataset from `datasets`;
- a quantization config object containing dataset specification.
Returns calibration dataset as an instance of `OVCalibrationDataset` containing an `nncf.Dataset` for each model component.
For example, for a sequence-to-sequence model with `encoder_model` and `decoder_model` components, the dictionary
will contain two keys: `encoder_model` and `decoder_model`.
"""
def __init__(self, model: transformers.PreTrainedModel, seed: int = 42):
"""
Args:
model (`transformers.PreTrainedModel`):
The model to build calibration dataset for.
seed (`int`, defaults to 42):
Random seed to use for reproducibility.
"""
self.model = model
self.seed = seed
# TODO: deprecate "signature_columns": model.forward() may not be the method which is called during inference,
# for example there is model.generate()
signature = inspect.signature(self.model.forward)
self._signature_columns = list(signature.parameters.keys())
def build_from_quantization_config(self, config: OVQuantizationConfigBase) -> OVCalibrationDataset:
"""
Builds a calibration dataset from a quantization config object. Namely, `quantization_config.dataset` property
is used to infer dataset name.
Args:
config (`OVQuantizationConfigBase`):
The quantization configuration object.
Returns:
A calibration dataset as an instance of `OVCalibrationDataset` containing an `nncf.Dataset` for each model component.
"""
from optimum.intel import (
OVModelForCausalLM,
OVModelForFeatureExtraction,
OVModelForMaskedLM,
OVModelForVisualCausalLM,
OVModelForZeroShotImageClassification,
OVSentenceTransformer,
)
from optimum.intel.openvino.modeling_seq2seq import _OVModelForWhisper
if is_diffusers_available():
from optimum.intel.openvino.modeling_diffusion import OVDiffusionPipeline
if config.dataset is None:
raise ValueError("Please provide a dataset for calibration.")
if isinstance(self.model, OVModelForCausalLM):
return self._prepare_causal_lm_calibration_data(config)
elif isinstance(
self.model, (OVModelForVisualCausalLM, _OVModelForWhisper, OVModelForZeroShotImageClassification)
):
if config.processor is None:
raise ValueError(
"`processor` must be specified in order to run data-aware quantization. Please provide it as a"
"model id, or a path to a directory containing all the required configuration files."
)
if isinstance(self.model, OVModelForVisualCausalLM):
dataset_metadata = PREDEFINED_VISUAL_LM_DATASETS[config.dataset]
return self.build_from_dataset_name(
config,
dataset_metadata["id"],
num_samples=config.num_samples,
dataset_split=dataset_metadata["split"],
trust_remote_code=config.trust_remote_code,
)
elif isinstance(self.model, _OVModelForWhisper):
dataset_metadata = PREDEFINED_SPEECH_TO_TEXT_DATASETS[config.dataset]
return self.build_from_dataset_name(
config,
dataset_metadata["id"],
num_samples=config.num_samples, # This is an upper bound on how many audios are needed
dataset_config_name=dataset_metadata["name"],
dataset_split=dataset_metadata["split"],
trust_remote_code=config.trust_remote_code,
streaming=dataset_metadata["streaming"],
)
elif isinstance(self.model, OVModelForZeroShotImageClassification):
dataset_metadata = PREDEFINED_TEXT_IMAGE_ENCODER_DATASETS[config.dataset]
return self.build_from_dataset_name(
config,
dataset_metadata["id"],
num_samples=None,
dataset_split=dataset_metadata["split"],
trust_remote_code=config.trust_remote_code,
streaming=dataset_metadata["streaming"],
)
else:
raise Exception
elif is_diffusers_available() and isinstance(self.model, OVDiffusionPipeline):
if isinstance(config.dataset, str):
dataset_name = config.dataset
dataset_metadata = PREDEFINED_SD_DATASETS[dataset_name]
dataset = self.load_dataset(
dataset_name,
num_samples=config.num_samples, # This is an upper bound on how many prompts are needed
dataset_split=dataset_metadata["split"],
streaming=dataset_metadata["streaming"],
)
elif isinstance(config.dataset, list) and all(isinstance(it, str) for it in config.dataset):
dataset = config.dataset
else:
raise RuntimeError(
"Please provide dataset as one of the accepted dataset labels or as a list of string prompts."
)
return self.build_from_dataset(config, dataset)
elif isinstance(self.model, (OVModelForFeatureExtraction, OVSentenceTransformer, OVModelForMaskedLM)):
if isinstance(config.dataset, str):
dataset_metadata = PREDEFINED_LANGUAGE_DATASETS[config.dataset]
dataset = self.load_dataset(
dataset_metadata["id"],
num_samples=None,
dataset_config_name=dataset_metadata["name"],
dataset_split=dataset_metadata["split"],
trust_remote_code=config.trust_remote_code,
streaming=dataset_metadata["streaming"],
)
elif isinstance(config.dataset, list) and all(isinstance(it, str) for it in config.dataset):
dataset = datasets.Dataset.from_list([{"text": it} for it in config.dataset])
else:
raise ValueError(
"Please provide dataset as one of the accepted dataset labels or as a list of strings."
)
return self.build_from_dataset(config, dataset)
else:
raise RuntimeError("Unsupported model type for calibration dataset collection.")
def build_from_dataset_name(
self,
quantization_config: OVQuantizationConfigBase,
dataset_name: str,
num_samples: Optional[int] = None,
dataset_config_name: Optional[str] = None,
dataset_split: str = "train",
preprocess_function: Optional[Callable] = None,
preprocess_batch: bool = True,
token: Optional[Union[bool, str]] = None,
cache_dir: str = HUGGINGFACE_HUB_CACHE,
trust_remote_code: bool = False,
streaming: bool = False,
batch_size: Optional[int] = 1,
data_collator: Optional[DataCollator] = None,
remove_unused_columns: bool = False,
) -> OVCalibrationDataset:
"""
Create the calibration `datasets.Dataset` to use for the post-training static quantization calibration step.
Args:
quantization_config (`OVQuantizationConfigBase`):
The quantization configuration object.
dataset_name (`str`):
The dataset repository name on the Hugging Face Hub or path to a local directory containing data files
in generic formats and optionally a dataset script, if it requires some code to read the data files.
dataset_config_name (`str`, *optional*):
The name of the dataset configuration.
num_samples (`int`, *optional*):
The maximum number of samples composing the calibration dataset.
dataset_split (`str`, defaults to `"train"`):
Which split of the dataset to use to perform the calibration step.
preprocess_function (`Callable`, *optional*):
Processing function to apply to each example after loading dataset.
preprocess_batch (`bool`, defaults to `True`):
Whether the `preprocess_function` should be batched.
token (Optional[Union[bool, str]], defaults to `None`):
The token to use as HTTP bearer authorization for remote files. If `True`, will use the token generated
when running `huggingface-cli login` (stored in `~/.huggingface`).
cache_dir (`str`, *optional*):
Caching directory for a calibration dataset.
trust_remote_code (`bool`, defaults to `False`):
Whether or not to allow for custom models defined on the Hub in their own modeling files. This option
should only be set to `True` for repositories you trust and in which you have read the code, as it will
execute code present on the Hub on your local machine.
streaming (`bool`, defaults to `False`):
Whether to load dataset in streaming mode.
batch_size (`int`, defaults to 1):
The number of calibration samples to load per batch.
data_collator (`DataCollator`, *optional*):
The function to use to form a batch from a list of elements of the calibration dataset.
remove_unused_columns (`bool`, defaults to `False`):
Whether to remove the columns unused by the model forward method.
Returns:
A calibration dataset as an instance of `OVCalibrationDataset` containing an `nncf.Dataset` for each model component.
"""
if remove_unused_columns:
logger.warning("`remove_unused_columns` is deprecated and will be removed in optimum-intel v1.25.")
dataset = self.load_dataset(
dataset_name,
num_samples,
dataset_config_name,
dataset_split,
preprocess_function,
preprocess_batch,
token,
cache_dir,
trust_remote_code,
streaming,
)
return self.build_from_dataset(quantization_config, dataset, batch_size, data_collator, remove_unused_columns)
def build_from_dataset(
self,
quantization_config: OVQuantizationConfigBase,
dataset: Union["Dataset", List],
batch_size: Optional[int] = 1,
data_collator: Optional[DataCollator] = None,
remove_unused_columns: bool = False,
) -> OVCalibrationDataset:
"""
Args:
quantization_config (`OVQuantizationConfigBase`):
The quantization configuration object.
dataset (`Union[datasets.Dataset, List]`):
The dataset to collect calibration data from.
batch_size (`int`, defaults to 1):
The number of calibration samples to load per batch. Not always used.
data_collator (`DataCollator`, *optional*):
The function to use to form a batch from a list of elements of the calibration dataset. Not always used.
remove_unused_columns (`bool`, defaults to `False`):
Whether to remove the columns unused by the model forward method. Not always used.
Returns:
A calibration dataset as an instance of `OVCalibrationDataset` containing an `nncf.Dataset` for each model component.
"""
from optimum.intel import (
OVModelForFeatureExtraction,
OVModelForMaskedLM,
OVModelForVisualCausalLM,
OVModelForZeroShotImageClassification,
OVSentenceTransformer,
)
from optimum.intel.openvino.modeling_decoder import OVBaseDecoderModel
from optimum.intel.openvino.modeling_seq2seq import _OVModelForWhisper
if is_diffusers_available():
from optimum.intel.openvino.modeling_diffusion import OVDiffusionPipeline
if isinstance(dataset, list):
logger.warning(
"Providing dataset as a list is deprecated and will be removed in optimum-intel v1.25. "
"Please provide it as `datasets.Dataset`."
)
if isinstance(
self.model,
(
OVModelForVisualCausalLM,
_OVModelForWhisper,
OVModelForFeatureExtraction,
OVModelForMaskedLM,
OVModelForZeroShotImageClassification,
OVSentenceTransformer,
),
) or (is_diffusers_available() and isinstance(self.model, OVDiffusionPipeline)):
# Prepare from raw dataset avoiding dataloader creation
if batch_size != 1 or data_collator is not None or remove_unused_columns:
logger.warning(
"`batch_size`, `data_collator` and `remove_unused_columns` are not supported for this type of model."
)
if isinstance(self.model, OVModelForVisualCausalLM):
return self._prepare_visual_causal_lm_calibration_data(quantization_config, dataset)
elif isinstance(self.model, _OVModelForWhisper):
return self._prepare_speech_to_text_calibration_data(quantization_config, dataset)
elif is_diffusers_available() and isinstance(self.model, OVDiffusionPipeline):
return self._prepare_diffusion_calibration_data(quantization_config, dataset)
elif isinstance(self.model, (OVModelForFeatureExtraction, OVSentenceTransformer, OVModelForMaskedLM)):
return self._prepare_text_encoder_model_calibration_data(quantization_config, dataset)
elif isinstance(self.model, OVModelForZeroShotImageClassification):
return self._prepare_text_image_encoder_model_calibration_data(quantization_config, dataset)
else:
raise RuntimeError("Unsupported model type for calibration dataset collection.")
else:
# Prepare from dataloader
# Setting `remove_unused_columns=True` until it is not deprecated
dataloader = self._get_calibration_dataloader(
dataset, batch_size, data_collator, remove_unused_columns=True
)
if isinstance(self.model, OVBaseDecoderModel):
return self._prepare_decoder_calibration_data(quantization_config, dataloader)
else:
# Assuming this is the torch model quantization scenario
return OVCalibrationDataset({"model": nncf.Dataset(dataloader)})
def load_dataset(
self,
dataset_name: str,
num_samples: Optional[int] = None,
dataset_config_name: Optional[str] = None,
dataset_split: str = "train",
preprocess_function: Optional[Callable] = None,
preprocess_batch: bool = True,
token: Optional[Union[bool, str]] = None,
cache_dir: str = HUGGINGFACE_HUB_CACHE,
trust_remote_code: bool = False,
streaming: bool = False,
) -> "Dataset":
"""
Create the calibration `datasets.Dataset` to use for the post-training static quantization calibration step.
Args:
dataset_name (`str`):
The dataset repository name on the Hugging Face Hub or path to a local directory containing data files
in generic formats and optionally a dataset script, if it requires some code to read the data files.
num_samples (`int`, *optional*):
The maximum number of samples composing the calibration dataset.
dataset_config_name (`str`, *optional*):
The name of the dataset configuration.
dataset_split (`str`, defaults to `"train"`):
Which split of the dataset to use to perform the calibration step.
preprocess_function (`Callable`, *optional*):
Processing function to apply to each example after loading dataset.
preprocess_batch (`bool`, defaults to `True`):
Whether the `preprocess_function` should be batched.
token (Optional[Union[bool, str]], defaults to `None`):
The token to use as HTTP bearer authorization for remote files. If `True`, will use the token generated
when running `huggingface-cli login` (stored in `~/.huggingface`).
cache_dir (`str`, *optional*):
Caching directory for a calibration dataset.
trust_remote_code (`bool`, defaults to `False`):
Whether or not to allow for custom models defined on the Hub in their own modeling files. This option
should only be set to `True` for repositories you trust and in which you have read the code, as it will
execute code present on the Hub on your local machine.
streaming (`bool`, defaults to `False`):
Whether to load dataset in streaming mode.
Returns:
The calibration `datasets.Dataset` to use for the post-training static quantization calibration step.
"""
if not is_datasets_available():
raise ValueError(DATASETS_IMPORT_ERROR.format("OVCalibrationDatasetBuilder.load_dataset"))
from datasets import load_dataset
datasets_kwargs = {
"name": dataset_config_name,
"split": dataset_split,
"token": token,
"cache_dir": cache_dir,
"streaming": streaming,
}
if is_datasets_version(">=", "2.20.0"):
datasets_kwargs["trust_remote_code"] = trust_remote_code
dataset = load_dataset(dataset_name, **datasets_kwargs)
dataset = dataset.shuffle(seed=self.seed)
if num_samples is not None:
dataset = dataset.take(num_samples)
if preprocess_function is not None:
dataset = dataset.map(preprocess_function, batched=preprocess_batch)
return dataset
def _get_calibration_dataloader(
self,
dataset: Union["Dataset", List],
batch_size: Optional[int] = 1,
data_collator: Optional[DataCollator] = None,
remove_unused_columns: bool = False,
) -> OVDataLoader:
"""
Wrap dataset into a dataloader.
"""
if remove_unused_columns:
logger.warning("`remove_unused_columns` is deprecated and will be removed in optimum-intel v1.25.")
if not is_datasets_available():
raise ValueError(DATASETS_IMPORT_ERROR.format("OVCalibrationDatasetBuilder._get_calibration_dataloader"))
from datasets import Dataset, IterableDataset
data_collator = data_collator or default_data_collator
if remove_unused_columns and isinstance(dataset, Dataset):
dataset = self._remove_unused_columns(dataset)
sampler = None
if not isinstance(dataset, IterableDataset):
generator = torch.Generator()
generator.manual_seed(self.seed)
sampler = RandomSampler(dataset, generator=generator)
dataloader = DataLoader(
dataset, batch_size=batch_size, sampler=sampler, collate_fn=data_collator, drop_last=False
)
return OVDataLoader(dataloader)
def _prepare_decoder_calibration_data(
self, quantization_config: OVQuantizationConfigBase, dataloader: OVDataLoader
) -> OVCalibrationDataset:
"""
Prepares calibration data by collecting model inputs during inference.
"""
# Prefetch past_key_values
self.model.update_pkv_precision(True)
self.model.compile()
collected_inputs = []
num_samples = quantization_config.num_samples or 200
self.model.request = InferRequestWrapper(self.model.request, collected_inputs)
try:
for data in tqdm(dataloader, desc="Collecting calibration data", total=num_samples):
if len(collected_inputs) > num_samples:
break
self.model.generate(**data, max_new_tokens=1)
finally:
self.model.request = self.model.request.request
return OVCalibrationDataset(nncf.Dataset(collected_inputs))
def _prepare_causal_lm_calibration_data(
self, config: OVQuantizationConfigBase, seqlen: int = 32
) -> OVCalibrationDataset:
"""
Prepares calibration data for causal language models. Relies on `optimum.gptq.data` module.
"""
from optimum.gptq.data import get_dataset, prepare_dataset
tokenizer = AutoTokenizer.from_pretrained(config.tokenizer, trust_remote_code=config.trust_remote_code)
nsamples = config.num_samples if config.num_samples else 128
if isinstance(config.dataset, str):
if config.dataset == "auto":
generated_data = nncf.data.generate_text_data(self.model, tokenizer, dataset_size=nsamples)
calibration_dataset = [tokenizer(text, return_tensors="pt") for text in generated_data]
else:
calibration_dataset = get_dataset(config.dataset, tokenizer, seqlen=seqlen, nsamples=nsamples)
elif isinstance(config.dataset, list) and all(isinstance(it, str) for it in config.dataset):
calibration_dataset = [tokenizer(text, return_tensors="pt") for text in config.dataset[:nsamples]]
else:
raise ValueError("Please provide dataset as one of the accepted dataset labels or as a list of strings.")
calibration_dataset = prepare_dataset(calibration_dataset)
calibration_dataset = nncf.Dataset(calibration_dataset, lambda x: self.model.prepare_inputs(**x))
return OVCalibrationDataset(calibration_dataset)
def _prepare_visual_causal_lm_calibration_data(
self,
config: OVQuantizationConfigBase,
dataset: "Dataset",
max_image_size: Optional[int] = 600,
) -> OVCalibrationDataset:
"""
Prepares calibration data for VLM pipelines.
Currently, collects data only for a language model component.
"""
processor = AutoProcessor.from_pretrained(config.processor, trust_remote_code=config.trust_remote_code)
try:
tokenizer = AutoTokenizer.from_pretrained(config.tokenizer, trust_remote_code=config.trust_remote_code)
tokenizer_error = None
except Exception as tokenizer_error: # noqa: F841
tokenizer = None
dataset_metadata = PREDEFINED_VISUAL_LM_DATASETS[config.dataset]
calibration_data = []
num_samples = config.num_samples or 32
for item in tqdm(dataset, desc="Collecting calibration dataset", total=num_samples):
if len(calibration_data) > num_samples:
break
instruction = item[dataset_metadata["inputs"]["instruction"]]
image_url = item[dataset_metadata["inputs"]["image_url"]]
image = Image.open(requests.get(image_url, stream=True).raw).convert("RGB")
if max_image_size is not None:
# To avoid large images, resize them keeping the aspect ratio
scale_factor = max(image.size[0] / max_image_size, image.size[1] / max_image_size)
if scale_factor > 1:
new_size = (int(image.size[0] / scale_factor), int(image.size[1] / scale_factor))
image = image.resize(new_size)
try:
inputs = self.model.preprocess_inputs(
text=instruction, image=image, processor=processor, tokenizer=tokenizer, config=self.model.config
)
except ValueError as value_error:
if "Tokenizer is required." in str(value_error) and tokenizer_error is not None:
raise tokenizer_error
raise value_error
inputs_embeds, attention_mask, position_ids = self.model.get_multimodal_embeddings(**inputs)
language_model_inputs = self.model.language_model.prepare_inputs(
input_ids=None,
attention_mask=attention_mask,
position_ids=position_ids,
inputs_embeds=inputs_embeds,
)
calibration_data.append(language_model_inputs)
return OVCalibrationDataset({"lm_model": nncf.Dataset(calibration_data)})
def _prepare_speech_to_text_calibration_data(
self, config: OVQuantizationConfigBase, dataset: "Dataset"
) -> OVCalibrationDataset:
"""
Prepares calibration data for speech-to-text pipelines by inferring it on a dataset and collecting incurred inputs.
"""
from optimum.intel.openvino.modeling_seq2seq import OVDecoder, OVEncoder
models: Dict[str, Union[OVEncoder, OVDecoder]] = {}
collected_inputs: Dict[str, List[Dict[str, Any]]] = {}
for submodel_name in self.model._ov_submodel_names:
ov_component: Union[OVEncoder, OVDecoder] = getattr(self.model, submodel_name)
models[submodel_name] = ov_component
collected_inputs[submodel_name] = []
ov_component._compile()
ov_component.request = InferRequestWrapper(
ov_component.request, collected_inputs[submodel_name], apply_caching=True
)
try:
processor = AutoProcessor.from_pretrained(config.processor, trust_remote_code=config.trust_remote_code)
# Download audio inputs beforehand to avoid possible connection issues
num_samples = config.num_samples or 32
dataset = list(tqdm(dataset.take(num_samples), desc="Downloading audio inputs", total=num_samples))
for item in tqdm(dataset, desc="Collecting calibration data"):
audio = item["audio"]["array"]
sampling_rate = item["audio"]["sampling_rate"]
input_features = processor(audio, sampling_rate=sampling_rate, return_tensors="pt").input_features
self.model.generate(input_features)
finally:
for model in models.values():
model.request = model.request.request
for model_name in collected_inputs:
collected_inputs[model_name] = nncf.Dataset(collected_inputs[model_name])
return OVCalibrationDataset(collected_inputs)
def _prepare_diffusion_calibration_data(
self, config: OVQuantizationConfigBase, dataset: Union[List, "Dataset"]
) -> OVCalibrationDataset:
"""
Prepares calibration data for diffusion models by inferring it on a dataset. Currently, collects data only for
a vision diffusion component.
"""
self.model.compile()
diffuser_model_name = "unet" if self.model.unet is not None else "transformer"
diffuser = getattr(self.model, diffuser_model_name)
size = diffuser.config.get("sample_size", 64) * self.model.vae_scale_factor
height, width = 2 * (min(size, 512),)
num_samples = config.num_samples or 200
calibration_data = []
try:
self.disable_progress_bar(disable=True)
diffuser.request = InferRequestWrapper(diffuser.request, calibration_data)
pbar = tqdm(total=num_samples, desc="Collecting calibration data")
for item in dataset:
prompt = (
item[PREDEFINED_SD_DATASETS[config.dataset]["prompt_column_name"]]
if isinstance(item, dict)
else item
)
self.model(prompt, height=height, width=width)
pbar.update(min(num_samples, len(calibration_data)) - pbar.n)
if len(calibration_data) >= num_samples:
calibration_data = calibration_data[:num_samples]
break
finally:
diffuser.request = diffuser.request.request
self.disable_progress_bar(disable=False)
return OVCalibrationDataset({diffuser_model_name: nncf.Dataset(calibration_data[:num_samples])})
def _remove_unused_columns(self, dataset: "Dataset"):
# TODO: deprecate because model.forward() may not be the method which is called during inference,
# for example there is model.generate()
ignored_columns = list(set(dataset.column_names) - set(self._signature_columns))
return dataset.remove_columns(ignored_columns)
def disable_progress_bar(self, disable: bool = True) -> None:
if not hasattr(self.model, "_progress_bar_config"):
self.model._progress_bar_config = {"disable": disable}
else:
self.model._progress_bar_config["disable"] = disable
def _prepare_text_encoder_model_calibration_data(
self,
quantization_config: OVQuantizationConfigBase,
dataset: "Dataset",
seq_len: int = 128,
) -> OVCalibrationDataset:
"""
Prepares calibration data for text-encoder-like models.
Supports OVModelForFeatureExtraction, OVModelForMaskedLM and OVSentenceTransformer.
"""
from optimum.intel import OVModelForFeatureExtraction, OVModelForMaskedLM, OVSentenceTransformer
def get_tokenizer():
if isinstance(self.model, OVSentenceTransformer):
return self.model.tokenizer
else:
if quantization_config.tokenizer is None:
raise ValueError("Please provide tokenizer for calibration via quantization_config.tokenizer.")
tokenizer = AutoTokenizer.from_pretrained(
quantization_config.tokenizer, trust_remote_code=quantization_config.trust_remote_code
)
return tokenizer
self.model.compile()
num_samples = quantization_config.num_samples or 128
calibration_data = []
try:
inference_result_mock = {}
if isinstance(self.model, OVModelForFeatureExtraction):
inference_result_mock["last_hidden_state"] = np.empty((1,), np.float32)
elif isinstance(self.model, OVModelForMaskedLM):
inference_result_mock["logits"] = np.empty((1,), np.float32)
elif isinstance(self.model, OVSentenceTransformer):
inference_result_mock["token_embeddings"] = np.empty((1,), np.float32)
inference_result_mock["sentence_embedding"] = np.empty((1,), np.float32)
else:
raise RuntimeError(
f"Unsupported model type {type(self.model).__name__} for calibration dataset collection."
)
self.model.request = InferRequestWrapper(
self.model.request,
calibration_data,
inference_result_mock=inference_result_mock,
)
max_position_embeddings = getattr(self.model.config, "max_position_embeddings", None)
if max_position_embeddings is not None and max_position_embeddings > 0:
seq_len = min(seq_len, max_position_embeddings)
random_positions = None
if isinstance(self.model, OVModelForMaskedLM):
with numpy_seed(self.seed):
random_positions = np.random.randint(0, seq_len, num_samples)
tokenizer = None
pbar = tqdm(total=num_samples, desc="Collecting calibration data")
for item in dataset:
if "input_ids" in item:
# Assuming that dataset contains already preprocessed text
inputs = self._wrap_sample_as_array(item, add_batch_dim=True)
else:
tokenizer = tokenizer or get_tokenizer()
inputs = tokenizer(item["text"], truncation=True, max_length=seq_len, return_tensors="np")
if inputs["input_ids"].shape[1] < seq_len:
continue
if isinstance(self.model, OVModelForMaskedLM):
# Replace a random token with a mask token
inputs["input_ids"][0, random_positions[len(calibration_data)]] = tokenizer.mask_token_id
self.model(inputs) if isinstance(self.model, OVSentenceTransformer) else self.model(**inputs)
pbar.update(min(num_samples, len(calibration_data)) - pbar.n)
if len(calibration_data) >= num_samples:
break
finally:
self.model.request = self.model.request.request
return OVCalibrationDataset({"model": nncf.Dataset(calibration_data)})
def _prepare_text_image_encoder_model_calibration_data(
self,
quantization_config: OVQuantizationConfigBase,
dataset: "Dataset",
seq_len: int = 128,
) -> OVCalibrationDataset:
self.model.compile()
def get_processor():
processor = AutoProcessor.from_pretrained(
quantization_config.processor, trust_remote_code=quantization_config.trust_remote_code
)
return processor
max_position_embeddings = getattr(self.model.config, "max_position_embeddings", None)
if max_position_embeddings is not None and max_position_embeddings > 0:
seq_len = min(seq_len, max_position_embeddings)
num_samples = quantization_config.num_samples or 128
calibration_data = []
try:
inference_result_mock = {
"logits_per_image": np.empty((1,), np.float32),
"logits_per_text": np.empty((1,), np.float32),
"text_embeds": np.empty((1,), np.float32),
"image_embeds": np.empty((1,), np.float32),
}
self.model.request = InferRequestWrapper(
self.model.request,
calibration_data,
inference_result_mock=inference_result_mock,
)
processor = None
pbar = tqdm(total=num_samples, desc="Collecting calibration data")
for item in dataset:
if "input_ids" in item:
# Assuming that dataset contains already preprocessed text
inputs = self._wrap_sample_as_array(item, add_batch_dim=True)
else:
dataset_metadata = PREDEFINED_TEXT_IMAGE_ENCODER_DATASETS[quantization_config.dataset]
try:
response = requests.get(item[dataset_metadata["image_column_name"]], timeout=5)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
except Exception:
continue
processor = processor or get_processor()
inputs = processor(
text=item[dataset_metadata["text_column_name"]],
images=image.convert("RGB"),
return_tensors="pt",
padding=True,
)
if inputs["input_ids"].shape[1] > seq_len:
inputs["input_ids"] = inputs["input_ids"][:, :seq_len]
self.model(**inputs)
pbar.update(min(num_samples, len(calibration_data)) - pbar.n)
if len(calibration_data) >= num_samples:
break
finally:
self.model.request = self.model.request.request
return OVCalibrationDataset({"model": nncf.Dataset(calibration_data)})
@staticmethod
def _wrap_sample_as_array(
sample: Dict[str, Any], add_batch_dim: bool = False
) -> Dict[str, Union[np.ndarray, torch.Tensor]]:
"""
Converts a sample to a dictionary of numpy arrays or torch tensors.
"""
results = {}
for k, v in sample.items():
v_as_array = v if isinstance(v, (torch.Tensor, np.ndarray)) else np.array(v)
if add_batch_dim:
v_as_array = v_as_array[None]
results[k] = v_as_array
return results
class OVQuantizer(OptimumQuantizer):
"""
Handle the NNCF quantization process.
"""
def __init__(self, model: transformers.PreTrainedModel, task: Optional[str] = None, seed: int = 42, **kwargs):
"""
Args:
model (`transformers.PreTrainedModel`):
The [PreTrainedModel](https://huggingface.co/docs/transformers/main_classes/model#transformers.PreTrainedModel) to quantize.
task (`str`, defaults to None):
The task defining the model topology used for the ONNX export.
seed (`int`, defaults to 42):
The random seed to use when shuffling the calibration dataset.
"""
super().__init__()
self.model = model
self.task = task
self.dataset_builder = OVCalibrationDatasetBuilder(model, seed)
@classmethod
def from_pretrained(cls, model: PreTrainedModel, **kwargs):
# TODO : Create model
return cls(model, **kwargs)
def quantize(
self,
calibration_dataset: Optional[
Union[OVCalibrationDataset, "Dataset", nncf.Dataset, Union[str, nncf.Dataset], List]
] = None,
save_directory: Optional[Union[str, Path]] = None,
ov_config: OVConfig = None,
file_name: Optional[str] = None,
batch_size: int = 1,
data_collator: Optional[DataCollator] = None,
remove_unused_columns: bool = False,
**kwargs,
):
"""
Quantize a model given the optimization specifications defined in `quantization_config`.
Args:
calibration_dataset (`datasets.Dataset` or `nncf.Dataset` or `Iterable`, *optional*):
A collection of data samples to use for quantization calibration. Is optional for weight-only
quantization and is required for full quantization.
save_directory (`Union[str, Path]`, *optional*):
The directory where the quantized model should be saved.
ov_config (`OVConfig`, *optional*):
The configuration containing the parameters related to quantization. If not provided, 8-bit symmetric
weight-only quantization will be applied.
file_name (`str`, *optional*):
The model file name to use when saving the model. Overwrites the default file name `"model.onnx"`.
batch_size (`int`, defaults to 1):
The number of calibration samples to load per batch.
data_collator (`DataCollator`, *optional*):
The function to use to form a batch from a list of elements of the calibration dataset.
remove_unused_columns (`bool`, defaults to `False`):
Whether to remove the columns unused by the model forward method.
Examples:
```python
>>> from optimum.intel import OVQuantizer, OVModelForCausalLM
>>> from transformers import AutoModelForCausalLM
>>> model = AutoModelForCausalLM.from_pretrained("databricks/dolly-v2-3b")
>>> quantizer = OVQuantizer.from_pretrained(model, task="text-generation")
>>> ov_config = OVConfig(quantization_config=OVWeightQuantizationConfig())
>>> quantizer.quantize(ov_config=ov_config, save_directory="./quantized_model")
>>> optimized_model = OVModelForCausalLM.from_pretrained("./quantized_model")
```
```python
>>> from optimum.intel import OVQuantizer, OVModelForSequenceClassification
>>> from transformers import AutoModelForSequenceClassification
>>> model = OVModelForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english", export=True)
>>> # or
>>> model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
>>> quantizer = OVQuantizer.from_pretrained(model, task="text-classification")
>>> ov_config = OVConfig(quantization_config=OVQuantizationConfig())
>>> quantizer.quantize(calibration_dataset=dataset, ov_config=ov_config, save_directory="./quantized_model")
>>> optimized_model = OVModelForSequenceClassification.from_pretrained("./quantized_model")
```
"""
if remove_unused_columns:
logger.warning("`remove_unused_columns` is deprecated and will be removed in optimum-intel v1.25.")
if isinstance(calibration_dataset, list):
logger.warning(
"Providing calibration dataset as a list is deprecated and will be removed in optimum-intel v1.25. "
"Please provide it as `datasets.Dataset` or as dictionary of `nncf.Dataset` instances."
)
if calibration_dataset is not None and isinstance(calibration_dataset, (dict, nncf.Dataset)):
calibration_dataset = OVCalibrationDataset(calibration_dataset)
if ov_config is None:
ov_config = OVConfig()
if not isinstance(ov_config, OVConfig):
raise TypeError(f"`ov_config` should be an `OVConfig`, but got: {type(ov_config)} instead.")
if ov_config.quantization_config is None:
logger.warning(
"`quantization_config` was not provided. In the future, please provide `quantization_config`"
)
if calibration_dataset is None:
logger.warning("Calibration dataset was not provided, assuming weight only quantization.")
ov_config.quantization_config = OVWeightQuantizationConfig(bits=8)
else:
logger.warning("Calibration dataset was provided, assuming static quantization.")
ov_config.quantization_config = OVQuantizationConfig()
quantization_config = ov_config.quantization_config
if quantization_config.dataset is not None and calibration_dataset is not None:
logger.info(
"Both `quantization_config.dataset` and `calibration_dataset` were provided for weight only "
"quantization. Will rely on `calibration_dataset`."
)
if is_diffusers_available():
from optimum.intel.openvino.modeling_diffusion import OVDiffusionPipeline
if calibration_dataset is not None and not isinstance(calibration_dataset, OVCalibrationDataset):
# Process custom calibration dataset
if (
is_diffusers_available()
and isinstance(self.model, OVDiffusionPipeline)
and is_datasets_available()
and isinstance(calibration_dataset, Dataset)
and "caption" in calibration_dataset.column_names
):
logger.warning(
"Assuming `caption` column should be used for calibration. This behavior will be deprecated in "
"optimum-intel v1.25. Please filter out the unnecessary columns before passing the dataset."
)
calibration_dataset = calibration_dataset.select_columns(["caption"])
if (
is_diffusers_available()
and isinstance(self.model, OVDiffusionPipeline)
and isinstance(calibration_dataset, list)
and all(isinstance(it, str) for it in calibration_dataset)
):
# To be deprecated
if quantization_config.dataset is not None:
raise ValueError(
"Both `calibration_dataset` and `quantization_config.dataset` are provided and the latter is "
"a list of strings. This behavior is ambiguous."
)
logger.warning(
"Providing calibration dataset for diffusion models a list of string will be deprecated "
"in optimum-intel v1.25. Please provide the list inside `quantization_config.dataset`"
"property instead."
)
quantization_config.dataset = calibration_dataset
calibration_dataset = None
else:
calibration_dataset = self.dataset_builder.build_from_dataset(
quantization_config, calibration_dataset, batch_size, data_collator, remove_unused_columns
)
from .modeling_base import OVBaseModel
if isinstance(self.model, OVBaseModel):
if self.model._compile_only:
raise ValueError(
"Quantization for `compile_only` model is not supported. Please load model with `compile_only=False`"
)
self._quantize_ovbasemodel(
ov_config,
save_directory,
calibration_dataset,
**kwargs,
)
elif isinstance(self.model, torch.nn.Module):
logger.warning(
"The support of `torch.nn.Module` will be deprecated in a future release of optimum-intel, please use the corresponding `OVModelForXxx` class to load you model."
"To convert a PyTorch model to OpenVINO, you can set `export=True` when loading your model as `OVModelForXxx.from_pretrained(..., export=True)`"
)
self._quantize_torchmodel(
ov_config,
save_directory,
calibration_dataset,
file_name,
**kwargs,
)
else:
raise TypeError(f"Unsupported model type: {type(self.model)}")
def _quantize_ovbasemodel(
self,
ov_config: OVConfig,
save_directory: Union[str, Path] = None,
calibration_dataset: Optional[OVCalibrationDataset] = None,
**kwargs,
):
from optimum.intel.openvino.modeling_seq2seq import _OVModelForWhisper
from optimum.intel.openvino.modeling_visual_language import OVModelForVisualCausalLM
if is_diffusers_available():
from optimum.intel.openvino.modeling_diffusion import OVDiffusionPipeline
quantization_config = ov_config.quantization_config
if calibration_dataset is None and quantization_config.dataset is not None:
calibration_dataset = self.dataset_builder.build_from_quantization_config(quantization_config)
quantization_configs = {}
if isinstance(quantization_config, OVPipelineQuantizationConfig):
quantization_configs = quantization_config.quantization_configs
elif (
isinstance(quantization_config, OVWeightQuantizationConfig)
and quantization_config.quant_method != OVQuantizationMethod.HYBRID
):
#
# Regular (non-hybrid) weight-only quantization
#
if is_diffusers_available() and isinstance(self.model, OVDiffusionPipeline):
for submodel_name in self.model.ov_submodels:
quantization_configs[submodel_name] = quantization_config
elif isinstance(self.model, OVModelForVisualCausalLM):
for submodel_name in self.model.ov_submodels:
quantization_configs[submodel_name] = (
quantization_config
if submodel_name == "lm_model"
else OVWeightQuantizationConfig(bits=8, sym=True)
)
else:
quantization_configs["model"] = quantization_config
else:
#
# Hybrid/Full/Mixed quantization
#
if calibration_dataset is None:
raise ValueError("Calibration dataset is required to run data-aware quantization.")
if (
isinstance(quantization_config, OVWeightQuantizationConfig)
and quantization_config.quant_method == OVQuantizationMethod.HYBRID
):
#
# Hybrid quantization
#
if is_diffusers_available() and isinstance(self.model, OVDiffusionPipeline):
if len(calibration_dataset) > 1:
raise ValueError("Calibration datasets for Diffusion models should contain only one value.")
# Apply hybrid quantization to diffusion model
diffusion_model_name = next(iter(calibration_dataset))
diffusion_model = getattr(self.model, diffusion_model_name).model
quantization_configs[diffusion_model_name] = _get_hybrid_mixed_quantization_config(
diffusion_model, quantization_config, **kwargs
)
# Apply weight-only quantization to all SD submodels except UNet/Transformer
quantization_config_copy = quantization_config.clone()
quantization_config_copy.dataset = None
quantization_config_copy.quant_method = OVQuantizationMethod.DEFAULT
for submodel_name in self.model.ov_submodels:
if submodel_name != diffusion_model_name:
quantization_configs[submodel_name] = quantization_config_copy
else:
# The model may be for example OVModelForImageClassification, OVModelForAudioClassification, etc.
quantization_configs["model"] = quantization_config
elif isinstance(quantization_config, OVQuantizationConfig):
#
# Full quantization
#
if isinstance(self.model, _OVModelForWhisper):
for submodel_name in self.model.ov_submodels:
# quantization_config.num_samples of audio samples result in more actual model inputs
config = quantization_config.clone()
config.num_samples = calibration_dataset[submodel_name].get_length()
quantization_configs[submodel_name] = config
elif is_diffusers_available() and isinstance(self.model, OVDiffusionPipeline):
diffusion_model_name = next(iter(calibration_dataset))
quantization_configs[diffusion_model_name] = quantization_config
for submodel_name in self.model.ov_submodels:
if submodel_name != diffusion_model_name:
quantization_configs[submodel_name] = OVWeightQuantizationConfig(bits=8)
elif isinstance(self.model, OVModelForVisualCausalLM):
for submodel_name in self.model.ov_submodels:
quantization_configs[submodel_name] = (
quantization_config
if submodel_name == "lm_model"
else OVWeightQuantizationConfig(bits=8, sym=True)
)
else:
quantization_configs["model"] = quantization_config
elif isinstance(quantization_config, OVMixedQuantizationConfig):
#
# Mixed quantization
#
if is_diffusers_available() and isinstance(self.model, OVDiffusionPipeline):
raise NotImplementedError("Mixed precision quantization isn't supported for diffusers.")
quantization_configs["model"] = quantization_config
else:
raise ValueError(f"Unsupported type of quantization config: {type(quantization_config)}")
for submodel_name, config in quantization_configs.items():
if submodel_name not in self.model.ov_submodels:
raise RuntimeError(
f"Unexpected submodel name encountered during applying quantization: {submodel_name}. "
f"Available submodels: {list(self.model.ov_submodels.keys())}."
)
submodel = self.model.ov_submodels[submodel_name]
nncf_dataset = calibration_dataset.get(submodel_name, None) if calibration_dataset else None
if isinstance(config, OVWeightQuantizationConfig) and config.quant_method == OVQuantizationMethod.HYBRID:
config = _get_hybrid_mixed_quantization_config(submodel, config, **kwargs)
if isinstance(config, OVWeightQuantizationConfig):
# Weight only quantization is performed in-place
_weight_only_quantization(submodel, config, nncf_dataset, **kwargs)
elif isinstance(config, (OVQuantizationConfig, OVMixedQuantizationConfig)):
if nncf_dataset is None:
raise ValueError(
f"Calibration dataset for submodel {submodel_name} is required to run quantization."
)
if isinstance(config, OVQuantizationConfig):
quantized_model = _full_quantization(submodel, config, nncf_dataset, **kwargs)
else:
quantized_model = _mixed_quantization(submodel, config, nncf_dataset, **kwargs)
# Replace the original model with the quantized model
if isinstance(getattr(self.model, submodel_name), openvino.Model):
setattr(self.model, submodel_name, quantized_model)
elif isinstance(getattr(getattr(self.model, submodel_name), "model"), openvino.Model):
setattr(getattr(self.model, submodel_name), "model", quantized_model)
else:
raise RuntimeError("Can't locate OpenVINO model to replace it with the quantized one.")
else:
raise ValueError(f"Unsupported type of quantization config: {type(config)}.")
self.model.clear_requests()
if save_directory is not None:
save_directory = Path(save_directory)
save_directory.mkdir(parents=True, exist_ok=True)
self.model.save_pretrained(save_directory)
ov_config.save_pretrained(save_directory)
def _quantize_torchmodel(
self,
ov_config: OVConfig,
save_directory: Union[str, Path],
calibration_datasets: Optional[OVCalibrationDataset] = None,
file_name: Optional[str] = None,
**kwargs,
):
if save_directory is None:
# TODO : can be set to self.model.config.name_or_path for OVModels when not provided
raise ValueError("`save_directory` needs to be specified")
self._set_task()
save_directory = Path(save_directory)
save_directory.mkdir(parents=True, exist_ok=True)
ov_file_name = file_name if file_name is not None else OV_XML_FILE_NAME
output_path = save_directory.joinpath(ov_file_name)
output_path = output_path.with_suffix(".xml").as_posix()
model_type = self.model.config.model_type.replace("_", "-")
onnx_config_class = TasksManager.get_exporter_config_constructor(
exporter="openvino",
model=self.model,
task=self.task,
model_type=model_type,
)
save_onnx_model = ov_config.save_onnx_model
onnx_file_name = (
ONNX_WEIGHTS_NAME if file_name is None and save_onnx_model else Path(ov_file_name).with_suffix(".onnx")
)
task = self.task
model = self.model
self.model.config.save_pretrained(save_directory)
if task.startswith("text-generation"):
onnx_config = onnx_config_class(
model.config, use_past=model.config.use_cache, use_past_in_inputs=model.config.use_cache
)
if model.config.use_cache:
task = "text-generation-with-past"
else:
onnx_config = onnx_config_class(model.config)
stateful = ensure_stateful_is_available() and ensure_export_task_support_stateful(task)
quantization_config = ov_config.quantization_config
if isinstance(quantization_config, OVWeightQuantizationConfig):
from optimum.exporters.utils import check_dummy_inputs_are_allowed
if stateful:
# patch model before weight compression
model = patch_model_with_bettertransformer(model)
dummy_inputs = onnx_config.generate_dummy_inputs(framework="pt")
device = get_model_device(model)
dummy_inputs = tree_map(
lambda value: value.to(device) if isinstance(value, torch.Tensor) else value, dummy_inputs
)
check_dummy_inputs_are_allowed(model, dummy_inputs)
nncf.compress_weights(model, dataset=nncf.Dataset([dummy_inputs]))
else:
if not isinstance(quantization_config, OVQuantizationConfig):
raise ValueError(f"Unsupported type of quantization config: {type(quantization_config)}")
if stateful:
logger.warning(
"Quantization algorithm does not support optimized stateful models. "
"The original model without optimization will be quantized and exported."
)
stateful = False
if calibration_datasets is None:
raise ValueError("Calibration dataset is required to run quantization.")
if "model" not in calibration_datasets:
raise RuntimeError("Calibration dataset should contain a key 'model' with a dataset.")
model = nncf.quantize(
model,
calibration_datasets["model"],
subset_size=quantization_config.num_samples or 128,
ignored_scope=quantization_config.get_ignored_scope_instance(),
model_type=nncf.ModelType(quantization_config.model_type),
preset=(
nncf.QuantizationPreset.PERFORMANCE if quantization_config.sym else nncf.QuantizationPreset.MIXED
),
fast_bias_correction=quantization_config.fast_bias_correction,
advanced_parameters=nncf.AdvancedQuantizationParameters(
overflow_fix=OverflowFix(quantization_config.overflow_fix)
),
**kwargs,
)
model_path = save_directory / (onnx_file_name if save_onnx_model else ov_file_name)
onnx_path = save_directory / onnx_file_name
export_fn = export if not save_onnx_model else export_pytorch_via_onnx
opset = min(onnx_config.DEFAULT_ONNX_OPSET, MAX_ONNX_OPSET)
opset = max(opset, MIN_ONNX_QDQ_OPSET)
export_kwargs = {}
if not save_onnx_model:
export_kwargs = {"stateful": stateful}
_, _, is_onnx = export_fn(model=model, config=onnx_config, output=model_path, opset=opset, **export_kwargs)
if is_onnx:
# Load and save the compressed model
model = core.read_model(onnx_path)
# Model required second saving for appling weights compression transformations
self._save_pretrained(model, output_path)
# if onnx conversion happens as fallback for pytorch conversion, remove onnx model
if not save_onnx_model:
os.remove(onnx_path)
try:
os.remove(f"{onnx_path}_data")
except FileNotFoundError:
pass
ov_config.save_pretrained(save_directory)
@staticmethod
def _save_pretrained(model: openvino.Model, output_path: str):
compress_quantize_weights_transformation(model)
openvino.save_model(model, output_path, compress_to_fp16=False)
def _set_task(self):
if self.task is None:
self.task = TasksManager.infer_task_from_model(self.model.config._name_or_path)
if self.task is None:
raise ValueError(
"The task defining the model topology could not be extracted and needs to be specified for the ONNX export."
)
self.task = _TASK_ALIASES.get(self.task, self.task)
if self.task == "text2text-generation":
raise ValueError("Seq2Seq models are currently not supported for post-training static quantization.")
if self.task == "image-to-text":
raise ValueError("Image2Text models are currently not supported for post-training static quantization.")
def get_calibration_dataset(
self,
dataset_name: str,
num_samples: Optional[int] = 100,
dataset_config_name: Optional[str] = None,
dataset_split: str = "train",
preprocess_function: Optional[Callable] = None,
preprocess_batch: bool = True,
token: Optional[Union[bool, str]] = None,
cache_dir: str = HUGGINGFACE_HUB_CACHE,
trust_remote_code: bool = False,
streaming: bool = False,
) -> "Dataset":
"""
Create the calibration `datasets.Dataset` to use for the post-training static quantization calibration step.
Args:
dataset_name (`str`):
The dataset repository name on the Hugging Face Hub or path to a local directory containing data files
in generic formats and optionally a dataset script, if it requires some code to read the data files.
num_samples (`int`, defaults to 100):
The maximum number of samples composing the calibration dataset.
dataset_config_name (`str`, *optional*):
The name of the dataset configuration.
dataset_split (`str`, defaults to `"train"`):
Which split of the dataset to use to perform the calibration step.
preprocess_function (`Callable`, *optional*):
Processing function to apply to each example after loading dataset.
preprocess_batch (`bool`, defaults to `True`):
Whether the `preprocess_function` should be batched.
token (Optional[Union[bool, str]], defaults to `None`):
The token to use as HTTP bearer authorization for remote files. If `True`, will use the token generated
when running `huggingface-cli login` (stored in `~/.huggingface`).
cache_dir (`str`, *optional*):
Caching directory for a calibration dataset.
trust_remote_code (`bool`, defaults to `False`):
Whether or not to allow for custom models defined on the Hub in their own modeling files. This option
should only be set to `True` for repositories you trust and in which you have read the code, as it will
execute code present on the Hub on your local machine.
streaming (`bool`, defaults to `False`):
Whether to load dataset in streaming mode.
Returns:
The calibration `datasets.Dataset` to use for the post-training static quantization calibration step.
"""
# TODO: consider in the future for this method to return OVCalibrationDataset instance from either datasets.Dataset instance or its name as input.
# This way OVQuantizer.quantize() will accept fully ready OVCalibrationDataset instance and `batch_size` and `data_collator` arguments can be removed.
# Example usage in such scenario:
# ```
# calibration_dataset: OVCalibrationDataset = ov_quantizer.get_calibration_dataset(ov_config, dataset_name, ..., batch_size, data_collator)
# ov_quantizer.quantize(calibration_dataset, ov_config)
# ```
return self.dataset_builder.load_dataset(
dataset_name,
num_samples,
dataset_config_name,
dataset_split,
preprocess_function,
preprocess_batch,
token,
cache_dir,
trust_remote_code,
streaming,
)
def _weight_only_quantization(
model: openvino.Model,
quantization_config: Union[OVWeightQuantizationConfig, Dict],
calibration_dataset: Optional[Union[nncf.Dataset, Iterable]] = None,
**kwargs,
) -> openvino.Model:
_verify_not_optimized(model)
config = quantization_config
if isinstance(config, dict):
config = OVWeightQuantizationConfig.from_dict(quantization_config)
if not isinstance(config, OVWeightQuantizationConfig):
raise ValueError(
f"Expected quantization config to be an instance of `OVWeightQuantizationConfig`, but got {type(config)}."
)
dataset = None
if calibration_dataset is not None:
if is_datasets_available() and isinstance(calibration_dataset, Dataset):
raise ValueError(
"Providing calibration dataset as an instance of `datasets.Dataset` for OV weight-only "
"quantization is not supported. Please provide it as `nncf.Dataset` or as iterable of "
"model inputs."
)
elif isinstance(calibration_dataset, nncf.Dataset):
dataset = calibration_dataset
else:
# This already should not be used, deprecation warning is added just in case
logger.warning("Providing calibration dataset as an iterable will be deprecated in optimum-intel v1.25.")
dataset = nncf.Dataset(calibration_dataset)
wc_kwargs = config.to_nncf_dict()
# Arguments provided in kwargs override the ones from the config
kwargs_intersection = set(wc_kwargs.keys()) & set(kwargs.keys())
if kwargs_intersection:
logger.warning(
f"The following nncf.compress_weights() arguments from the OVWeightQuantizationConfig will be overridden "
f"by the ones given in _weight_only_quantization call kwargs: {kwargs_intersection}."
)
wc_kwargs.update(kwargs)
wc_kwargs.pop("weight_only", None)
compressed_model = nncf.compress_weights(
model,
dataset=dataset,
**wc_kwargs,
)
_remove_f16_kv_cache_precision_flag(compressed_model)
_add_nncf_version_flag(compressed_model)
return compressed_model
def _full_quantization(
model: openvino.Model,
quantization_config: OVQuantizationConfig,
calibration_dataset: nncf.Dataset,
verify_not_optimized: bool = True,
**kwargs,
):
if not isinstance(quantization_config, OVQuantizationConfig):
raise ValueError(
f"Expected quantization config to be an instance of `OVQuantizationConfig`, but got {type(quantization_config)}."
)
if verify_not_optimized:
_verify_not_optimized(model)
q_kwargs = quantization_config.to_nncf_dict()
# Arguments provided in kwargs override the ones from the config
kwargs_intersection = set(q_kwargs.keys()) & set(kwargs.keys())
if kwargs_intersection:
logger.warning(
f"The following nncf.quantize() arguments from the OVQuantizationConfig will be overridden "
f"by the ones given in _full_quantization call kwargs: {kwargs_intersection}."
)
q_kwargs.update(kwargs)
q_kwargs.pop("weight_only", None)
quantized_model = nncf.quantize(model, calibration_dataset=calibration_dataset, **q_kwargs)
_remove_f16_kv_cache_precision_flag(quantized_model)
_add_nncf_version_flag(quantized_model)
return quantized_model
def _get_operation_const_op(operation, const_port_id: int):
node = operation.input_value(const_port_id).get_node()
queue = deque([node])
constant_node = None
allowed_propagation_types_list = ["Convert", "FakeQuantize", "Reshape"]
while len(queue) != 0:
curr_node = queue.popleft()
if curr_node.get_type_name() == "Constant":
constant_node = curr_node
break
if len(curr_node.inputs()) == 0:
break
if curr_node.get_type_name() in allowed_propagation_types_list:
queue.append(curr_node.input_value(0).get_node())
return constant_node
def _is_embedding(node) -> bool:
allowed_types_list = ["f16", "f32", "f64"]
const_port_id = 0
input_tensor = node.input_value(const_port_id)
if input_tensor.get_element_type().get_type_name() in allowed_types_list:
const_node = _get_operation_const_op(node, const_port_id)
if const_node is not None:
return True
return False
def _collect_ops_with_weights(model):
ops_with_weights = []
for op in model.get_ops():
if op.get_type_name() == "MatMul":
constant_node_0 = _get_operation_const_op(op, const_port_id=0)
constant_node_1 = _get_operation_const_op(op, const_port_id=1)
if constant_node_0 or constant_node_1:
ops_with_weights.append(op.get_friendly_name())
if op.get_type_name() == "Gather" and _is_embedding(op):
ops_with_weights.append(op.get_friendly_name())
return ops_with_weights
def _get_hybrid_mixed_quantization_config(
model: openvino.Model,
quantization_config: OVWeightQuantizationConfig,
**kwargs,
) -> OVMixedQuantizationConfig:
"""
Returns mixed quantization config representing hybrid quantization
Args:
model (`openvino.Model`):
The OpenVINO model for applying quantization.
quantization_config (`OVWeightQuantizationConfig`):
The configuration containing the parameters related to quantization.
Returns:
The mixed quantization config representing hybrid quantization.
"""
# Hybrid quantization means that we quantize
# (1) weights of MatMul and Embedding layers
# (2) activations of other layers.
wc_config = quantization_config.clone()
wc_config.ignored_scope = {}
if any(op.get_type_name() == "Convolution" for op in model.get_ops()):
wc_config.ignored_scope["types"] = ["Convolution"]
q_config_ignored_scope = {"names": _collect_ops_with_weights(model)}
q_config = OVQuantizationConfig(
ignored_scope=q_config_ignored_scope,
num_samples=quantization_config.num_samples or 200,
smooth_quant_alpha=-1,
**kwargs,
)
mixed_quantization_config = OVMixedQuantizationConfig(
weight_quantization_config=wc_config,
full_quantization_config=q_config,
ignored_scope=quantization_config.ignored_scope,
**kwargs,
)
return mixed_quantization_config
def _mixed_quantization(
model: openvino.Model,
quantization_config: OVMixedQuantizationConfig,
dataset: nncf.Dataset,
**kwargs,
) -> openvino.Model:
"""
Perform mixed precision quantization where we separately quantize:
(1) weights of weighted layers to the precision given in the `quantization_config.weight_quantization_config`, and
(2) weights and activations of other possible layers; precision is given in the `quantization_config.full_quantization_config`.
By default, weights of all weighted layers are quantized in the first step. In the second step activations of
weighted and non-weighted layers are quantized. If some layers are instructed to be ignored in the first step
with `weight_quantization_config.ignored_scope` parameter, both weights and activations of these layers are
quantized to the precision given in the `full_quantization_config`.
Args:
model (`openvino.Model`):
The OpenVINO Runtime model for applying quantization.
quantization_config (`OVMixedQuantizationConfig`):
The configuration containing the parameters related to quantization.
dataset (`nncf.Dataset`):
The dataset used for quantization.
Returns:
The OpenVINO Runtime model with applied quantization.
"""
def merge_ignored_scopes(
ignored_scope_1: Union[Dict[str, List[str]], None], ignored_scope_2: Union[Dict[str, List[str]], None]
) -> Dict[str, List[str]]:
if ignored_scope_1 is None:
return copy.deepcopy(ignored_scope_2) if ignored_scope_2 is not None else None
if ignored_scope_2 is None:
return copy.deepcopy(ignored_scope_1)
merged_ignored_scope = {}
for key in set(ignored_scope_1) | set(ignored_scope_2):
merged_ignored_scope[key] = list(set(ignored_scope_1.get(key, []) + ignored_scope_2.get(key, [])))
return merged_ignored_scope
wc_config = quantization_config.weight_quantization_config.clone()
wc_config.ignored_scope = merge_ignored_scopes(wc_config.ignored_scope, quantization_config.ignored_scope)
wc_dataset = dataset if wc_config.bits != 8 else None
compressed_model = _weight_only_quantization(model, wc_config, wc_dataset, **kwargs)
q_config = quantization_config.full_quantization_config.clone()
q_config.ignored_scope = merge_ignored_scopes(q_config.ignored_scope, quantization_config.ignored_scope)
quantized_model = _full_quantization(compressed_model, q_config, dataset, verify_not_optimized=False, **kwargs)
return quantized_model
def _verify_not_optimized(ov_model):
message_template = (
"Cannot apply optimization to the model because it was already optimized with the following config: {}. "
"To avoid this issue, check that you set load_in_8bit=False or not using quantization_config at export in the .from_pretrained(), "
"or explicitly specify weight format with --weight_format fp16/fp32 when using CLI."
)
rt_info = ov_model.get_rt_info()
if "nncf" in rt_info:
model_weight_compression_config = rt_info["nncf"].get("weight_compression", None)
model_quantization_config = rt_info["nncf"].get("quantization", None)
if model_weight_compression_config is not None:
raise RuntimeError(message_template.format(model_weight_compression_config))
elif model_quantization_config is not None:
raise RuntimeError(message_template.format(model_quantization_config))
def _remove_f16_kv_cache_precision_flag(model: openvino.Model) -> openvino.Model:
# Remove the KV cache compression disabling flag from the model
if model.has_rt_info(["runtime_options", "KV_CACHE_PRECISION"]):
prev_rt_info = model.get_rt_info("runtime_options").value
if prev_rt_info["KV_CACHE_PRECISION"] == "f16":
prev_rt_info.pop("KV_CACHE_PRECISION")
model.set_rt_info(prev_rt_info, "runtime_options")
return model
def _add_nncf_version_flag(model: openvino.Model) -> openvino.Model:
model.set_rt_info(_nncf_version, ["optimum", "nncf_version"])
return model
@contextmanager
def numpy_seed(seed: int):
"""
Context manager to set the numpy random seed.
"""
old_state = np.random.get_state()
np.random.seed(seed)
try:
yield
finally:
np.random.set_state(old_state)