optimum/furiosa/modeling_base.py (224 lines of code) (raw):

# Copyright 2023 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from pathlib import Path from shutil import copyfile from tempfile import TemporaryDirectory from typing import Dict, Optional, Tuple, Union import onnx from huggingface_hub import hf_hub_download from transformers import PretrainedConfig from transformers.file_utils import add_start_docstrings # Import Furiosa SDK from furiosa import optimizer from furiosa.runtime import session from furiosa.tools.compiler.api import compile from optimum.exporters.onnx import main_export from optimum.modeling_base import OptimizedModel from .utils import ( FURIOSA_ENF_FILE_NAME, FURIOSA_QUANTIZED_FILE_NAME, ONNX_WEIGHTS_NAME, ONNX_WEIGHTS_NAME_STATIC, maybe_load_preprocessors, maybe_save_preprocessors, ) logger = logging.getLogger(__name__) @add_start_docstrings( """ Base FuriosaAIModel class. """, ) class FuriosaAIBaseModel(OptimizedModel): auto_model_class = None export_feature = None def __init__( self, model: Union[bytes, str, Path], config: PretrainedConfig = None, device: str = None, furiosa_config: Optional[Dict[str, str]] = None, model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None, input_shape_dict: Optional[Dict[str, Tuple[int]]] = None, output_shape_dict: Optional[Dict[str, Tuple[int]]] = None, **kwargs, ): self.config = config self.model_save_dir = model_save_dir self.furiosa_config = furiosa_config self.preprocessors = kwargs.get("preprocessors", []) enable_compilation = kwargs.get("compile", True) self.model = model if enable_compilation: self.model = self.compile(model, input_shape_dict, output_shape_dict) self.create_session() def _save_pretrained(self, save_directory: Union[str, Path], file_name: Optional[str] = None, **kwargs): dst_path = Path(save_directory) / FURIOSA_ENF_FILE_NAME if isinstance(self.model, (str, Path)): copyfile(self.model, dst_path) else: with open(dst_path, "wb") as f: f.write(self.model) def create_session(self): """ Create a Furiosa runtime session for the model. Creates a session object using the Furiosa runtime for executing the model. Returns: None """ self.sess = session.create(self.model) self.input_num = self.sess.input_num self.inputs_to_dtype = [] for i in range(self.input_num): self.inputs_to_dtype.append(self.sess.input(i).dtype) @classmethod def _from_pretrained( cls, model_id: Union[str, Path], config: PretrainedConfig, use_auth_token: Optional[Union[bool, str, None]] = None, revision: Optional[Union[str, None]] = None, force_download: bool = False, cache_dir: Optional[str] = None, file_name: Optional[str] = None, subfolder: str = "", from_onnx: bool = False, from_quantized: bool = False, local_files_only: bool = False, input_shape_dict: Optional[Dict[str, Tuple[int]]] = None, output_shape_dict: Optional[Dict[str, Tuple[int]]] = None, **kwargs, ): """ Loads a model and its configuration file from a directory or the Hugging Face Hub. Args: model_id (Union[str, Path]): The directory from which to load the model. Can be either: - The model ID of a pretrained model hosted inside a model repo on huggingface.co. - The path to a directory containing the model weights. config (PretrainedConfig): The configuration object associated with the model. use_auth_token (Union[bool, str, None], defaults to None): The token to use as HTTP bearer authorization for remote files. Needed to load models from a private repository. revision (Union[str, None], defaults to None): The specific model version to use. It can be a branch name, a tag name, or a commit ID. force_download (bool, defaults to False): Whether or not to force the (re-)download of the model weights and configuration files, overriding the cached versions if they exist. cache_dir (str, defaults to None): The path to a directory in which a downloaded pretrained model configuration should be cached if the standard cache should not be used. file_name (str, defaults to None): The file name of the model to load. Overwrites the default file name and allows one to load the model with a different name. subfolder (str, defaults to ""): The subfolder to load the model. from_onnx (bool, defaults to False): Whether the model is being loaded from an ONNX file. from_quantized (bool, defaults to False): Whether the model is being loaded from a quantized file. local_files_only (bool, defaults to False): Whether or not to only look at local files (i.e., do not try to download the model). input_shape_dict (Dict[str, Tuple[int]], defaults to None): A dictionary specifying the input shapes for dynamic models. output_shape_dict (Dict[str, Tuple[int]], defaults to None): A dictionary specifying the output shapes for dynamic models. **kwargs: Additional keyword arguments to be passed to the underlying model loading function. Returns: An instance of the model class loaded from the specified directory or Hugging Face Hub. """ if from_onnx: default_file_name = ONNX_WEIGHTS_NAME elif from_quantized: default_file_name = FURIOSA_QUANTIZED_FILE_NAME else: default_file_name = FURIOSA_ENF_FILE_NAME file_name = file_name or default_file_name # Load the model from local directory if Path(model_id).is_dir(): file_path = Path(model_id) / file_name model_save_dir = model_id preprocessors = maybe_load_preprocessors(model_id) # Download the model from the hub else: file_path = hf_hub_download( repo_id=model_id, filename=file_name, subfolder=subfolder, use_auth_token=use_auth_token, revision=revision, cache_dir=cache_dir, force_download=force_download, local_files_only=local_files_only, ) model_save_dir = Path(file_path).parent preprocessors = maybe_load_preprocessors(model_id, subfolder=subfolder) model = cls.load_model(file_path, input_shape_dict, output_shape_dict) return cls( model, config=config, model_save_dir=model_save_dir, compile=False, preprocessors=preprocessors, **kwargs ) @classmethod def _from_transformers( cls, model_id: str, config: PretrainedConfig, use_auth_token: Optional[Union[bool, str]] = None, revision: Optional[str] = None, force_download: bool = False, cache_dir: Optional[str] = None, subfolder: str = "", local_files_only: bool = False, task: Optional[str] = None, **kwargs, ): """ Export a vanilla Transformers model into an ONNX model using `transformers.onnx.export_onnx`. Arguments: model_id (`str` or `Path`): The directory from which to load the model. Can be either: - The model id of a pretrained model hosted inside a model repo on huggingface.co. - The path to a directory containing the model weights. save_dir (`str` or `Path`): The directory where the exported ONNX model should be saved, default to `transformers.file_utils.default_cache_path`, which is the cache directory for transformers. use_auth_token (`str` or `bool`): Is needed to load models from a private repository revision (`str`): Revision is the specific model version to use. It can be a branch name, a tag name, or a commit id kwargs (`Dict`, *optional*): kwargs will be passed to the model during initialization """ if task is None: task = cls.export_feature save_dir = TemporaryDirectory() save_dir_path = Path(save_dir.name) # Export the model to the ONNX format main_export( model_name_or_path=model_id, output=save_dir_path, task=task, do_validation=False, no_post_process=True, subfolder=subfolder, revision=revision, cache_dir=cache_dir, use_auth_token=use_auth_token, local_files_only=local_files_only, force_download=force_download, ) config.save_pretrained(save_dir_path) maybe_save_preprocessors(model_id, save_dir_path, src_subfolder=subfolder) return cls._from_pretrained( model_id=save_dir_path, config=config, from_onnx=True, use_auth_token=use_auth_token, revision=revision, force_download=force_download, cache_dir=cache_dir, local_files_only=local_files_only, **kwargs, ) @classmethod def load_model( cls, model_path: Union[str, Path], input_shape_dict: Optional[Dict[str, Tuple[int]]] = None, output_shape_dict: Optional[Dict[str, Tuple[int]]] = None, ): """ Loads and processes a model for use with the Furiosa framework. Args: model_path (Union[str, Path]): The path to the model file. input_shape_dict (Dict[str, Tuple[int]], defaults to None): A dictionary specifying the input shapes for dynamic models. output_shape_dict (Dict[str, Tuple[int]], defaults to None): A dictionary specifying the output shapes for dynamic models. Returns: If the model is in the 'onnx' or 'dfg' format, the compiled model in the Furiosa binary format is returned. If the model is in the 'enf' format, the model path is returned as-is. Raises: ValueError: If the model format is not supported or invalid. """ model_path = Path(model_path) if model_path.suffix in (".onnx", ".dfg"): compiled_model = cls.compile(model_path, input_shape_dict, output_shape_dict) return compiled_model if model_path.suffix == ".enf": return model_path raise ValueError("Invalid model types. Supported formats are 'onnx', 'dfg', or 'enf'.") @classmethod def compile( cls, model: Union[str, Path, bytes], input_shape_dict: Optional[Dict[str, Tuple[int]]] = None, output_shape_dict: Optional[Dict[str, Tuple[int]]] = None, ): """ Compiles the model to the Furiosa binary format. Args: model (Union[str, Path]): The model to be compiled. input_shape_dict (Dict[str, Tuple[int]], defaults to None): A dictionary specifying the input shapes for dynamic models. output_shape_dict (Dict[str, Tuple[int]], defaults to None): A dictionary specifying the output shapes for dynamic models. Returns: The compiled model in the Furiosa binary format. Raises: ValueError: If the model format is not supported or invalid. """ if isinstance(model, (str, Path)): model = cls._reshape(model, input_shape_dict, output_shape_dict) input_bytes = Path(model).read_bytes() else: input_bytes = model logger.info("Compiling the model...") compiled_model = compile(input_bytes, target_ir="enf") return compiled_model @staticmethod def _check_is_dynamic(model_path: Union[str, Path]): is_dynamic = False if Path(model_path).suffix == ".onnx": model = onnx.load(model_path) is_dynamic = any(any(dim.dim_param for dim in inp.type.tensor_type.shape.dim) for inp in model.graph.input) return is_dynamic @staticmethod def optimize_model(model: onnx.ModelProto) -> Path: return optimizer.frontend.onnx.optimize_model(model) @staticmethod def _update_inputs_outputs_dims( model_path: Union[str, Path], input_shape_dict: Dict[str, Tuple[int]], output_shape_dict: Dict[str, Tuple[int]], ) -> onnx.ModelProto: from onnx import shape_inference from onnx.tools import update_model_dims model = onnx.load(model_path) updated_model = update_model_dims.update_inputs_outputs_dims(model, input_shape_dict, output_shape_dict) return shape_inference.infer_shapes(updated_model) @classmethod def _reshape( cls, model_path: Union[str, Path], input_shape_dict: Dict[str, Tuple[int]], output_shape_dict: Dict[str, Tuple[int]], ) -> Union[str, Path]: """ Propagates the given input shapes on the model's layers, fixing the input shapes of the model. Args: model_path (Union[str, Path]): Path to the model. input_shape_dict (Dict[str, Tuple[int]]): Input shapes for the model. output_shape_dict (Dict[str, Tuple[int]]): Output shapes for the model. Returns: Union[str, Path]: Path to the model after updating the input shapes. Raises: ValueError: If the model provided has dynamic axes in input/output and no input/output shape is provided. """ if isinstance(model_path, (str, Path)) and Path(model_path).suffix == ".onnx": is_dynamic = cls._check_is_dynamic(model_path) if is_dynamic: if input_shape_dict is None or output_shape_dict is None: raise ValueError( "The model provided has dynamic axes in input/output. Please provide input and output shapes for compilation." ) model = cls._update_inputs_outputs_dims(model_path, input_shape_dict, output_shape_dict) optimized_model = cls.optimize_model(model) static_model_path = Path(model_path).parent / ONNX_WEIGHTS_NAME_STATIC onnx.save(optimized_model, static_model_path) return static_model_path return model_path def forward(self, *args, **kwargs): raise NotImplementedError