optimum/intel/openvino/modeling_sentence_transformers.py (128 lines of code) (raw):

from pathlib import Path from types import MethodType from typing import Dict, List, Optional, Tuple, Union import numpy as np import torch from huggingface_hub.constants import HUGGINGFACE_HUB_CACHE from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, PretrainedConfig from transformers.file_utils import add_start_docstrings from .. import OVConfig from .configuration import OVQuantizationConfigBase from .modeling import MODEL_START_DOCSTRING, OVModel @add_start_docstrings( """ OpenVINO Model for feature extraction tasks for Sentence Transformers. """, MODEL_START_DOCSTRING, ) class OVSentenceTransformer(OVModel): export_feature = "feature-extraction" _library_name = "sentence_transformers" def __init__(self, model=None, config=None, tokenizer=None, **kwargs): super().__init__(model, config, **kwargs) self.encode = MethodType(SentenceTransformer.encode, self) self._text_length = MethodType(SentenceTransformer._text_length, self) self.default_prompt_name = None self.truncate_dim = None self.tokenizer = tokenizer def _save_pretrained(self, save_directory: Union[str, Path]): super()._save_pretrained(save_directory) self.tokenizer.save_pretrained(save_directory) def forward(self, inputs: Dict[str, torch.Tensor]): self.compile() input_ids = inputs.get("input_ids") attention_mask = inputs.get("attention_mask") token_type_ids = inputs.get("token_type_ids") np_inputs = isinstance(input_ids, np.ndarray) if not np_inputs: input_ids = input_ids.cpu().numpy() attention_mask = attention_mask.cpu().numpy() token_type_ids = token_type_ids.cpu().numpy() if token_type_ids is not None else token_type_ids inputs = { "input_ids": input_ids, "attention_mask": attention_mask, } # Add the token_type_ids when needed if "token_type_ids" in self.input_names: inputs["token_type_ids"] = token_type_ids if token_type_ids is not None else np.zeros_like(input_ids) outputs = self._inference(inputs) return { "token_embeddings": torch.from_numpy(outputs["token_embeddings"]).to(self.device), "sentence_embedding": torch.from_numpy(outputs["sentence_embedding"]).to(self.device), } @classmethod def _from_pretrained( cls, model_id: Union[str, Path], config: PretrainedConfig, token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, file_name: Optional[str] = None, subfolder: str = "", from_onnx: bool = False, local_files_only: bool = False, load_in_8bit: bool = False, quantization_config: Union[OVQuantizationConfigBase, Dict] = None, **kwargs, ): trust_remote_code = kwargs.pop("trust_remote_code", False) tokenizer_kwargs = kwargs.pop("tokenizer_kwargs", None) tokenizer_args = { "token": token, "trust_remote_code": trust_remote_code, "revision": revision, "local_files_only": local_files_only, } if tokenizer_kwargs: kwargs["tokenizer_args"].update(tokenizer_kwargs) tokenizer = AutoTokenizer.from_pretrained(model_id, **tokenizer_args) model = super()._from_pretrained( model_id=model_id, config=config, token=token, revision=revision, force_download=force_download, cache_dir=cache_dir, file_name=file_name, subfolder=subfolder, from_onnx=from_onnx, local_files_only=local_files_only, tokenizer=tokenizer, **kwargs, ) quantization_config = cls._prepare_quantization_config(quantization_config, load_in_8bit) if quantization_config is not None: from optimum.intel import OVQuantizer quantizer = OVQuantizer(model) quantizer.quantize(ov_config=OVConfig(quantization_config=quantization_config)) return model def tokenize( self, texts: Union[List[str], List[Dict], List[Tuple[str, str]]], padding: Union[str, bool] = True ) -> Dict[str, torch.Tensor]: """Tokenizes a text and maps tokens to token-ids""" output = {} if isinstance(texts[0], str): to_tokenize = [texts] elif isinstance(texts[0], dict): to_tokenize = [] output["text_keys"] = [] for lookup in texts: text_key, text = next(iter(lookup.items())) to_tokenize.append(text) output["text_keys"].append(text_key) to_tokenize = [to_tokenize] else: batch1, batch2 = [], [] for text_tuple in texts: batch1.append(text_tuple[0]) batch2.append(text_tuple[1]) to_tokenize = [batch1, batch2] # strip to_tokenize = [[str(s).strip() for s in col] for col in to_tokenize] output.update( self.tokenizer( *to_tokenize, padding=padding, truncation="longest_first", return_tensors="pt", ) ) return output