optimum/neuron/utils/cache_utils.py (195 lines of code) (raw):

# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Utilities for caching.""" import os import re from pathlib import Path from typing import List, Optional, Union from uuid import uuid4 from huggingface_hub import ( HfApi, RepoUrl, create_repo, get_token, ) from huggingface_hub.utils import GatedRepoError, HfHubHTTPError, RepositoryNotFoundError, RevisionNotFoundError from transformers import PretrainedConfig from ...utils import logging from ...utils.logging import warn_once from .misc import is_main_worker, string_to_bool logger = logging.get_logger() HOME = Path.home() DEFAULT_HF_HOME = f"{HOME}/.cache/huggingface" XDG_CACHE_HOME = os.environ.get("XDG_CACHE_HOME", None) if XDG_CACHE_HOME is not None: DEFAULT_HF_HOME = f"{XDG_CACHE_HOME}/huggingface" HF_HOME = os.environ.get("HF_HOME", DEFAULT_HF_HOME) CACHE_REPO_FILENAME = "optimum_neuron_custom_cache" HF_HOME_CACHE_REPO_FILE = f"{HF_HOME}/{CACHE_REPO_FILENAME}" CACHE_REPO_NAME = "optimum-neuron-cache" if os.environ.get("HUGGINGFACE_CO_STAGING") == "1": HF_HUB_CACHE_REPOS = [] else: HF_HUB_CACHE_REPOS = [f"aws-neuron/{CACHE_REPO_NAME}"] # For testing purposes. _DISABLE_IS_PRIVATE_REPO_CHECK: bool = string_to_bool( os.environ.get("OPTIMUM_NEURON_DISABLE_IS_PRIVATE_REPO_CHECK", "false") ) if _DISABLE_IS_PRIVATE_REPO_CHECK: logger.warning( "The check that prevents you from pushing compiled files from private models is disabled. This is allowed " "only for testing purposes." ) def load_custom_cache_repo_name_from_hf_home( hf_home_cache_repo_file: Union[str, Path] = HF_HOME_CACHE_REPO_FILE, ) -> Optional[str]: if Path(hf_home_cache_repo_file).exists(): with open(hf_home_cache_repo_file, "r") as fp: repo_id = fp.read() return repo_id.strip() return None def set_custom_cache_repo_name_in_hf_home( repo_id: str, hf_home: str = HF_HOME, check_repo: bool = True, api: Optional[HfApi] = None ): hf_home_cache_repo_file = f"{hf_home}/{CACHE_REPO_FILENAME}" if api is None: api = HfApi() if check_repo: try: api.repo_info(repo_id, repo_type="model") except Exception as e: raise ValueError( f"Could not save the custom Neuron cache repo to be {repo_id} because it does not exist or is " f"private to you. Complete exception message: {e}." ) existing_custom_cache_repo = load_custom_cache_repo_name_from_hf_home(hf_home_cache_repo_file) if is_main_worker() and existing_custom_cache_repo is not None: logger.warning( f"A custom cache repo was already registered: {existing_custom_cache_repo}. It will be overwritten to " f"{repo_id}." ) with open(hf_home_cache_repo_file, "w") as fp: fp.write(repo_id) def delete_custom_cache_repo_name_from_hf_home(hf_home_cache_repo_file: str = HF_HOME_CACHE_REPO_FILE): Path(hf_home_cache_repo_file).unlink(missing_ok=True) def create_custom_cache_repo(repo_id: str = CACHE_REPO_NAME, private: bool = True) -> RepoUrl: repo_url = create_repo(repo_id, private=private, repo_type="model") set_custom_cache_repo_name_in_hf_home(repo_url.repo_id) return repo_url def is_private_repo(repo_id: str) -> bool: """Tells whether `repo_id` is private.""" if _DISABLE_IS_PRIVATE_REPO_CHECK: return False try: HfApi().model_info(repo_id=repo_id, token=get_token()) private_to_user = False except RepositoryNotFoundError: private_to_user = True if private_to_user: private = True else: try: HfApi().model_info(repo_id=repo_id, token=False) private = False except RepositoryNotFoundError: private = True return private _CACHED_HAS_WRITE_ACCESS_TO_REPO = {} def has_write_access_to_repo(repo_id: str) -> bool: # If the result has already been cached, use it instead of requesting the HF Hub again. token = get_token() key = (token, repo_id) if key in _CACHED_HAS_WRITE_ACCESS_TO_REPO: return _CACHED_HAS_WRITE_ACCESS_TO_REPO[key] api = HfApi() has_access = None try: api.delete_branch(repo_id=repo_id, repo_type="model", branch=f"this-branch-does-not-exist-{uuid4()}") except GatedRepoError: has_access = False except RepositoryNotFoundError: # We could raise an error to indicate the user that the repository could not even be found: # raise ValueError(f"Repository {repo_id} not found (repo_type: {repo_type}). Is it a private one?") from e # But here we simply return `False`, because it means that we do not have write access to this repo in the end. has_access = False except RevisionNotFoundError: has_access = True # has write access, otherwise would have been 403 forbidden. except HfHubHTTPError as e: if e.response.status_code in (401, 403): has_access = False if has_access is None: raise ValueError(f"Cannot determine write access to {repo_id}") # Cache the result for subsequent calls. _CACHED_HAS_WRITE_ACCESS_TO_REPO[key] = has_access return has_access def get_hf_hub_cache_repos(log_warnings: bool = False) -> List[str]: """ Retrieves the name of the Hugging Face Hub model repo to use as remote cache. Priority: - If a repo is provided via the `CUSTOM_CACHE_REPO` environment variable, it will be used, - Else, if a custom cache repo has been set locally, it will be used, - Otherwise, it uses the default cache repo (on which most people do not have write access) """ # Default hub repos. hf_hub_repos = HF_HUB_CACHE_REPOS # Locally saved hub repo. saved_custom_cache_repo = load_custom_cache_repo_name_from_hf_home() if saved_custom_cache_repo is not None and saved_custom_cache_repo not in hf_hub_repos: hf_hub_repos = [saved_custom_cache_repo] + hf_hub_repos # Hub repo set via the environment variable CUSTOM_CACHE_REPO. custom_cache_repo = os.environ.get("CUSTOM_CACHE_REPO", None) if custom_cache_repo is not None and custom_cache_repo not in hf_hub_repos: hf_hub_repos = [custom_cache_repo] + hf_hub_repos if log_warnings and is_main_worker() and saved_custom_cache_repo is None and custom_cache_repo is None: warn_once( logger, "No Neuron cache name is saved locally. This means that only the official Neuron cache will be used. You " "can create a Neuron cache repo by running the following command: `optimum-cli neuron cache create`. If " "the Neuron cache already exists you can set it by running the following command: `optimum-cli neuron cache " "set -n [name]`.", ) if log_warnings and is_main_worker() and hf_hub_repos and not has_write_access_to_repo(hf_hub_repos[0]): warn_once( logger, f"You do not have write access to {hf_hub_repos[0]} so you will not be able to push any cached compilation " "files. Please log in and/or use a custom Neuron cache.", ) return hf_hub_repos def get_hf_hub_cache_repo(log_warnings: bool = False) -> str: return get_hf_hub_cache_repos(log_warnings=log_warnings)[0] def get_neuron_cache_path() -> Optional[Path]: # NEURON_CC_FLAGS is the environment variable read by the neuron compiler. # Among other things, this is where the cache directory is specified. neuron_cc_flags = os.environ.get("NEURON_CC_FLAGS", "") if "--no-cache" in neuron_cc_flags: return None else: match_ = re.search(r"--cache_dir=([\w\/-]+)", neuron_cc_flags) if match_: path = Path(match_.group(1)) else: path = Path("/var/tmp/neuron-compile-cache") return path def set_neuron_cache_path(neuron_cache_path: Union[str, Path], ignore_no_cache: bool = False): # NEURON_CC_FLAGS is the environment variable read by the neuron compiler. # Among other things, this is where the cache directory is specified. neuron_cc_flags = os.environ.get("NEURON_CC_FLAGS", "") if "--no-cache" in neuron_cc_flags: if ignore_no_cache: neuron_cc_flags = neuron_cc_flags.replace("--no-cache", "") else: raise ValueError( "Cannot set the neuron compile cache since --no-cache is in NEURON_CC_FLAGS. You can overwrite this " "behaviour by doing ignore_no_cache=True." ) if isinstance(neuron_cache_path, Path): neuron_cache_path = neuron_cache_path.as_posix() match_ = re.search(r"--cache_dir=([\w\/-]+)", neuron_cc_flags) if match_: neuron_cc_flags = neuron_cc_flags[: match_.start(1)] + neuron_cache_path + neuron_cc_flags[match_.end(1) :] else: neuron_cc_flags = neuron_cc_flags + f" --cache_dir={neuron_cache_path}" os.environ["NEURON_CC_FLAGS"] = neuron_cc_flags def get_num_neuron_cores() -> int: neuron_devices_path = Path("/sys/class/neuron_device/") if not neuron_devices_path.is_dir(): num_cores = 0 else: num_cores = len(list(neuron_devices_path.iterdir())) * 2 return num_cores def get_num_neuron_cores_used() -> int: return int(os.environ.get("WORLD_SIZE", "1")) def list_files_in_neuron_cache(neuron_cache_path: Union[str, Path], only_relevant_files: bool = False) -> List[Path]: if isinstance(neuron_cache_path, str): neuron_cache_path = Path(neuron_cache_path) files = [path for path in neuron_cache_path.glob("**/*") if path.is_file()] if only_relevant_files: files = [p for p in files if p.suffix in [".neff", ".pb", ".txt"]] return files def get_model_name_or_path(config: "PretrainedConfig") -> Optional[str]: attribute_names_to_try = ["_model_name_or_path", "_name_or_path"] model_name_or_path = None for name in attribute_names_to_try: attribute = getattr(config, name, None) if attribute is not None: model_name_or_path = attribute break if model_name_or_path == "": model_name_or_path = None return model_name_or_path