optimum/intel/openvino/modeling_open_clip.py (533 lines of code) (raw):

# Copyright 2024 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import os from pathlib import Path from typing import Dict, Optional, Union import numpy as np import torch from huggingface_hub import hf_hub_download from huggingface_hub.constants import HUGGINGFACE_HUB_CACHE from torch.nn import functional as F from transformers import ( CLIPConfig, PretrainedConfig, ) from transformers.file_utils import add_start_docstrings from transformers.modeling_outputs import ModelOutput from transformers.models.clip.modeling_clip import CLIPOutput from transformers.utils import is_offline_mode from optimum.exporters.tasks import TasksManager from ...exporters.openvino import main_export from ..utils.modeling_utils import _find_files_matching_pattern, _OpenClipForZeroShotImageClassification from .configuration import OVConfig, OVWeightQuantizationConfig from .modeling import MODEL_START_DOCSTRING, OVModel from .utils import TemporaryDirectory logger = logging.getLogger(__name__) class OVModelOpenCLIPBase(OVModel): config_name = "open_clip_config.json" _library_name = "open_clip" def __init__(self, model=None, config=None, **kwargs): super().__init__(model, config, **kwargs) @classmethod def _load_config( cls, config_name_or_path: Union[str, os.PathLike], revision: Optional[str] = None, cache_dir: str = HUGGINGFACE_HUB_CACHE, token: Optional[Union[bool, str]] = None, force_download: bool = False, subfolder: str = "", trust_remote_code: bool = False, local_files_only: bool = False, ) -> PretrainedConfig: config_path = None config_dir = os.path.join(config_name_or_path, subfolder) all_files, _ = TasksManager.get_model_files( config_name_or_path, subfolder=subfolder, cache_dir=cache_dir, revision=revision, token=token ) transformers_config_name = "config.json" config_name = None if cls.config_name in all_files: config_name = cls.config_name elif transformers_config_name in all_files: config_name = transformers_config_name if os.path.isdir(config_dir): if config_name is None: raise OSError( f"neither {cls.config_name} nor {transformers_config_name} was found in {config_dir} local folder" ) config_path = os.path.join(config_dir, config_name) else: if config_name: config_path = hf_hub_download( repo_id=config_name_or_path, filename=config_name, subfolder=subfolder, token=token, revision=revision, cache_dir=cache_dir, force_download=force_download, local_files_only=local_files_only, ) else: open_clip_config = _OpenClipForZeroShotImageClassification.find_config_by_hub_url(config_name_or_path) if config_path: open_clip_config = {} with open(config_path, "r", encoding="utf-8") as f: open_clip_config = json.load(f) model_cfg = open_clip_config.get("model_cfg", open_clip_config) text_config_dict = model_cfg.get("text_cfg", None) or model_cfg.get("text_config", None) vision_config_dict = model_cfg.get("vision_cfg", None) or model_cfg.get("vision_config", None) config = CLIPConfig( text_config_dict=text_config_dict, vision_config_dict=vision_config_dict, **open_clip_config, ) return config # function is overloaded to avoid fails when calling the TasksManager.infer_library_from_model in OptimizedModel.from_pretrained() # this should be removed when open_clip library support is added to optimum @classmethod def from_pretrained( cls, model_id: Union[str, Path], export: bool = False, config: Optional["PretrainedConfig"] = None, token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, subfolder: str = "", local_files_only: bool = False, task: Optional[str] = None, trust_remote_code: bool = False, **kwargs, ): if is_offline_mode() and not local_files_only: logger.info("Offline mode: forcing local_files_only=True") local_files_only = True _export = export try: if local_files_only: object_id = model_id.replace("/", "--") cached_model_dir = os.path.join(cache_dir, f"models--{object_id}") refs_file = os.path.join(os.path.join(cached_model_dir, "refs"), revision or "main") with open(refs_file) as f: revision = f.read() model_dir = os.path.join(cached_model_dir, "snapshots", revision) else: model_dir = model_id ov_files = _find_files_matching_pattern( model_dir, pattern=r"(.*)?openvino(.*)?\_model\_(.*)?.xml$", subfolder=subfolder, use_auth_token=token, revision=revision, ) _export = len(ov_files) == 0 if _export ^ export: if export: logger.warning( f"The model {model_id} was already converted to the OpenVINO IR but got `export=True`, the model will be converted to OpenVINO once again. " "Don't forget to save the resulting model with `.save_pretrained()`" ) _export = True else: logger.warning( f"No OpenVINO files were found for {model_id}, setting `export=True` to convert the model to the OpenVINO IR. " "Don't forget to save the resulting model with `.save_pretrained()`" ) except Exception as exception: logger.warning( f"Could not infer whether the model was already converted or not to the OpenVINO IR, keeping `export={export}`.\n{exception}" ) if isinstance(model_id, Path): model_id = model_id.as_posix() config_path = config if isinstance(config, (str, os.PathLike)) else model_id config = cls._load_config( config_path, revision=revision, cache_dir=cache_dir, token=token, force_download=force_download, subfolder=subfolder, trust_remote_code=trust_remote_code, local_files_only=local_files_only, ) from_pretrained_method = cls._export if _export else cls._from_pretrained return from_pretrained_method( model_id=model_id, config=config, revision=revision, cache_dir=cache_dir, force_download=force_download, token=token, subfolder=subfolder, local_files_only=local_files_only, trust_remote_code=trust_remote_code, task=task, **kwargs, ) @add_start_docstrings( """ OpenVINO Model for OpenCLIP Text model for tasks like zero-shot-image-classification. """, MODEL_START_DOCSTRING, ) class OVModelOpenCLIPText(OVModelOpenCLIPBase): _xml_model_name = "openvino_model_text.xml" export_feature = "feature-extraction" def __init__(self, model=None, config=None, tokenize_cfg=None, **kwargs): super().__init__(model, config, **kwargs) self.tokenize_cfg = tokenize_cfg @classmethod def _export( cls, model_id: str, config: PretrainedConfig, token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, subfolder: str = "", local_files_only: bool = False, task: Optional[str] = None, trust_remote_code: bool = False, load_in_8bit: Optional[bool] = None, quantization_config: Union[OVWeightQuantizationConfig, Dict] = None, **kwargs, ): save_dir = TemporaryDirectory() save_dir_path = Path(save_dir.name) # This attribute is needed to keep one reference on the temporary directory, since garbage collecting # would end-up removing the directory containing the underlying OpenVINO model cls._model_save_dir_tempdirectory_instance = save_dir # If load_in_8bit and quantization_config not specified then ov_config is set to None and will be set by default in convert depending on the model size if load_in_8bit is None and not quantization_config: ov_config = None else: ov_config = OVConfig(dtype="fp32") def fn_get_submodels(model): return {"model_text": model.text} custom_export_configs = { "model_text": {}, } main_export( model_name_or_path=model_id, output=save_dir_path, task=task or cls.export_feature, subfolder=subfolder, revision=revision, cache_dir=cache_dir, token=token, local_files_only=local_files_only, force_download=force_download, trust_remote_code=trust_remote_code, ov_config=ov_config, library_name=cls._library_name, framework="pt", fn_get_submodels=fn_get_submodels, custom_architecture=True, custom_export_configs=custom_export_configs, ) config.save_pretrained(save_dir_path) return cls._from_pretrained( model_id=save_dir_path, config=config, load_in_8bit=load_in_8bit, quantization_config=quantization_config, file_name=cls._xml_model_name, **kwargs, ) @classmethod def _from_pretrained( cls, model_id: Union[str, Path], config: PretrainedConfig, token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, file_name: Optional[str] = "openvino_model_text.xml", subfolder: str = "", from_onnx: bool = False, local_files_only: bool = False, load_in_8bit: bool = False, quantization_config: Union[OVWeightQuantizationConfig, Dict] = None, **kwargs, ): return super()._from_pretrained( model_id=model_id, config=config, token=token, revision=revision, force_download=force_download, cache_dir=cache_dir, file_name=file_name, subfolder=subfolder, from_onnx=from_onnx, local_files_only=local_files_only, load_in_8bit=load_in_8bit, quantization_config=quantization_config, **kwargs, ) def forward( self, input_ids: Union[torch.Tensor, np.ndarray], **kwargs, ): self.compile() inputs = {"text": input_ids} outputs = self._inference(inputs) return ModelOutput(text_features=torch.from_numpy(outputs["text_features"])) @add_start_docstrings( """ OpenVINO Model for OpenCLIP Vision model for tasks like zero-shot-image-classification. """, MODEL_START_DOCSTRING, ) class OVModelOpenCLIPVisual(OVModelOpenCLIPBase): _xml_model_name = "openvino_model_vision.xml" export_feature = "feature-extraction" def __init__(self, model=None, config=None, preprocess_cfg=None, **kwargs): super().__init__(model, config, **kwargs) self.preprocess_cfg = preprocess_cfg @classmethod def _export( cls, model_id: str, config: PretrainedConfig, token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, subfolder: str = "", local_files_only: bool = False, task: Optional[str] = None, trust_remote_code: bool = False, load_in_8bit: Optional[bool] = None, quantization_config: Union[OVWeightQuantizationConfig, Dict] = None, **kwargs, ): save_dir = TemporaryDirectory() save_dir_path = Path(save_dir.name) # This attribute is needed to keep one reference on the temporary directory, since garbage collecting # would end-up removing the directory containing the underlying OpenVINO model cls._model_save_dir_tempdirectory_instance = save_dir # If load_in_8bit and quantization_config not specified then ov_config is set to None and will be set by default in convert depending on the model size if load_in_8bit is None and not quantization_config: ov_config = None else: ov_config = OVConfig(dtype="fp32") def fn_get_submodels(model): return {"model_vision": model.visual} custom_export_configs = { "model_vision": {}, } main_export( model_name_or_path=model_id, output=save_dir_path, task=task or cls.export_feature, subfolder=subfolder, revision=revision, cache_dir=cache_dir, token=token, local_files_only=local_files_only, force_download=force_download, trust_remote_code=trust_remote_code, ov_config=ov_config, library_name=cls._library_name, framework="pt", fn_get_submodels=fn_get_submodels, custom_architecture=True, custom_export_configs=custom_export_configs, ) config.save_pretrained(save_dir_path) return cls._from_pretrained( model_id=save_dir_path, config=config, load_in_8bit=load_in_8bit, quantization_config=quantization_config, file_name=cls._xml_model_name, **kwargs, ) @classmethod def _from_pretrained( cls, model_id: Union[str, Path], config: PretrainedConfig, token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, file_name: Optional[str] = "openvino_model_vision.xml", subfolder: str = "", from_onnx: bool = False, local_files_only: bool = False, load_in_8bit: bool = False, quantization_config: Union[OVWeightQuantizationConfig, Dict] = None, **kwargs, ): return super()._from_pretrained( model_id=model_id, config=config, token=token, revision=revision, force_download=force_download, cache_dir=cache_dir, file_name=file_name, subfolder=subfolder, from_onnx=from_onnx, local_files_only=local_files_only, load_in_8bit=load_in_8bit, quantization_config=quantization_config, **kwargs, ) def forward( self, pixel_values: Union[torch.Tensor, np.ndarray], **kwargs, ): self.compile() inputs = {"x": pixel_values} outputs = self._inference(inputs) return ModelOutput(image_features=torch.from_numpy(outputs["image_features"])) @add_start_docstrings( """ OpenVINO Model with OpenCLIP for tasks like zero-shot-image-classification. """, MODEL_START_DOCSTRING, ) class OVModelOpenCLIPForZeroShotImageClassification: export_feature = "zero-shot-image-classification" def __init__( self, text_model: OVModelOpenCLIPText = None, visual_model: OVModelOpenCLIPVisual = None, config=None, init_logit_scale: float = np.log(1 / 0.07), init_logit_bias: Optional[float] = None, **kwargs, ): super().__init__() self.text_model = text_model self.visual_model = visual_model self.config = config self.logit_scale = None self.logit_scale = torch.nn.Parameter(torch.ones([]) * init_logit_scale) self.logit_bias = None if init_logit_bias is not None: self.logit_bias = torch.nn.Parameter(torch.ones([]) * init_logit_bias) else: self.logit_bias = None def __call__(self, *args, **kwargs): return self.forward(*args, **kwargs) def to(self, device: str): """ Use the specified `device` for inference. For example: "cpu" or "gpu". `device` can be in upper or lower case. To speed up first inference, call `.compile()` after `.to()`. """ self.text_model.to(device=device) self.visual_model.to(device=device) return self @classmethod def from_pretrained( cls, model_id: Union[str, Path], export: bool = False, config: Optional["PretrainedConfig"] = None, token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: str = HUGGINGFACE_HUB_CACHE, subfolder: str = "", local_files_only: bool = False, task: Optional[str] = None, trust_remote_code: bool = False, init_logit_scale: float = np.log(1 / 0.07), init_logit_bias: Optional[float] = None, **kwargs, ): text_model = OVModelOpenCLIPText.from_pretrained( model_id=model_id, export=export, config=config, token=token, revision=revision, force_download=force_download, cache_dir=cache_dir, subfolder=subfolder, local_files_only=local_files_only, task=task or cls.export_feature, trust_remote_code=trust_remote_code, **kwargs, ) visual_model = OVModelOpenCLIPVisual.from_pretrained( model_id=model_id, export=export, config=config, token=token, revision=revision, force_download=force_download, cache_dir=cache_dir, subfolder=subfolder, local_files_only=local_files_only, task=task or cls.export_feature, trust_remote_code=trust_remote_code, **kwargs, ) if config is None: config = text_model.config return cls( text_model=text_model, visual_model=visual_model, config=config, init_logit_scale=init_logit_scale, init_logit_bias=init_logit_bias, **kwargs, ) def forward( self, input_ids: Union[torch.Tensor, np.ndarray], pixel_values: Union[torch.Tensor, np.ndarray], **kwargs, ): self.text_model.compile() self.visual_model.compile() text_model_outputs = self.text_model._inference({"text": input_ids}) visual_model_outputs = self.visual_model._inference({"x": pixel_values}) text_features = F.normalize(torch.from_numpy(text_model_outputs["text_features"]), dim=-1) image_features = F.normalize(torch.from_numpy(visual_model_outputs["image_features"]), dim=-1) logits_per_image = self.logit_scale.exp() * image_features @ text_features.T if self.logit_bias is not None: logits_per_image += self.logit_bias logits_per_text = logits_per_image.T return CLIPOutput( loss=None, logits_per_image=logits_per_image, logits_per_text=logits_per_text, text_embeds=text_features, image_embeds=image_features, text_model_output=text_model_outputs, vision_model_output=visual_model_outputs, ) def save_pretrained( self, save_directory: Union[str, os.PathLike], push_to_hub: bool = False, **kwargs, ): self.text_model.save_pretrained(save_directory=save_directory, push_to_hub=push_to_hub, **kwargs) self.visual_model.save_pretrained(save_directory=save_directory, push_to_hub=push_to_hub, **kwargs) self.config.save_pretrained(save_directory) return def push_to_hub( self, save_directory: str, repository_id: str, private: Optional[bool] = None, token: Optional[Union[bool, str]] = None, ) -> str: self.text_model.push_to_hub( save_directory=save_directory, repository_id=repository_id, private=private, token=token ) self.visual_model.push_to_hub( save_directory=save_directory, repository_id=repository_id, private=private, token=token ) return def compile(self): self.text_model.compile() self.visual_model.compile() def reshape(self, batch_size: int, sequence_length: int, height: int = None, width: int = None): """ Propagates the given input shapes on the model's layers, fixing the inputs shapes of the model. Arguments: batch_size (`int`): The batch size. sequence_length (`int`): The sequence length or number of channels. height (`int`, *optional*): The image height. width (`int`, *optional*): The image width. """ self.text_model.reshape(batch_size=batch_size, sequence_length=sequence_length, height=height, width=width) self.visual_model.reshape(batch_size=batch_size, sequence_length=sequence_length, height=height, width=width) return self def half(self): """ Converts all the model weights to FP16 """ self.text_model.half() self.visual_model.half() return self def eval(self): return self def can_generate(self) -> bool: """ Returns whether this model can generate sequences with `.generate()`. """ return self.text_model.can_generate() and self.visual_model.can_generate()