optimum/commands/neuron/cache.py (189 lines of code) (raw):

# coding=utf-8 # Copyright 2023 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Defines the command line related to dealing with the Neuron cache repo.""" from typing import TYPE_CHECKING from ...neuron.cache import get_hub_cached_entries, synchronize_hub_cache from ...neuron.utils.cache_utils import ( CACHE_REPO_NAME, HF_HOME_CACHE_REPO_FILE, create_custom_cache_repo, set_custom_cache_repo_name_in_hf_home, ) from ...neuron.utils.require_utils import requires_torch_neuronx from ...neuron.utils.runner import ExampleRunner from ...utils import logging from ..base import BaseOptimumCLICommand, CommandInfo if TYPE_CHECKING: from argparse import ArgumentParser logger = logging.get_logger() class CreateCustomCacheRepoCommand(BaseOptimumCLICommand): @staticmethod def parse_args(parser: "ArgumentParser"): parser.add_argument( "-n", "--name", type=str, default=CACHE_REPO_NAME, help="The name of the repo that will be used as a remote cache for the compilation files.", ) parser.add_argument( "--public", action="store_true", help="If set, the created repo will be public. By default the cache repo is private.", ) def run(self): repo_url = create_custom_cache_repo(repo_id=self.args.name, private=not self.args.public) public_or_private = "public" if self.args.public else "private" logger.info(f"Neuron cache created on the Hugging Face Hub: {repo_url.repo_id} [{public_or_private}].") logger.info(f"Neuron cache name set locally to {repo_url.repo_id} in {HF_HOME_CACHE_REPO_FILE}.") class SetCustomCacheRepoCommand(BaseOptimumCLICommand): @staticmethod def parse_args(parser: "ArgumentParser"): parser.add_argument("name", type=str, help="The name of the repo to use as remote cache.") def run(self): set_custom_cache_repo_name_in_hf_home(self.args.name) logger.info(f"Neuron cache name set locally to {self.args.name} in {HF_HOME_CACHE_REPO_FILE}.") class AddToCacheRepoCommand(BaseOptimumCLICommand): @staticmethod def parse_args(parser: "ArgumentParser"): parser.add_argument("-m", "--model", type=str, required=True, help="The name of model or path of the model.") parser.add_argument("--task", type=str, required=True, help="The task for which the model should be compiled.") # Shapes parser.add_argument( "--train_batch_size", type=int, required=True, help="The batch size to use during the model compilation for training.", ) parser.add_argument( "--eval_batch_size", type=int, default=None, help="The batch size to use during model compilation for evaluation.", ) sequence_length_group = parser.add_mutually_exclusive_group() sequence_length_group.add_argument( "--sequence_length", type=int, help="The sequence length of the model during compilation." ) seq2seq_sequence_length_group = sequence_length_group.add_argument_group() seq2seq_sequence_length_group.add_argument( "--encoder_sequence_length", type=int, help="The sequence length of the encoder part of the model during compilation.", ) seq2seq_sequence_length_group.add_argument( "--decoder_sequence_length", type=int, help="The sequence length of the decoder part of the model during compilation.", ) parser.add_argument( "--gradient_accumulation_steps", type=int, default=1, help="The number of gradient accumulation steps.." ) parser.add_argument( "--precision", choices=["fp", "bf16"], type=str, required=True, help="The precision to use during the model compilation.", ) parser.add_argument( "--num_cores", choices=list(range(1, 33)), type=int, required=True, help="The number of neuron cores to use during compilation.", ) parser.add_argument( "--example_dir", type=str, default=None, help="Path to where the example scripts are stored." ) parser.add_argument( "--max_steps", type=int, default=10, help="The maximum number of steps to run compilation for." ) def run(self): runner = ExampleRunner(self.args.model, self.args.task, example_dir=self.args.example_dir) if self.args.eval_batch_size is None: self.args.eval_batch_size = self.args.train_batch_size if self.args.sequence_length is not None: sequence_length = self.args.sequence_length elif self.args.encoder_sequence_length is None and self.args.decoder_sequence_length is None: raise ValueError( "You need to specify either sequence_length or encoder_sequence_length and decoder_sequence_length" ) elif self.args.encoder_sequence_length is None or self.args.decoder_sequence_length is None: raise ValueError("Both the encoder_sequence_length and the decoder_sequence_length must be provided.") else: sequence_length = [self.args.encoder_sequence_length, self.args.decoder_sequence_length] returncode, stdout = runner.run( self.args.num_cores, self.args.precision, self.args.train_batch_size, sequence_length, do_eval=True, eval_batch_size=self.args.eval_batch_size, gradient_accumulation_steps=self.args.gradient_accumulation_steps, num_epochs=3, max_steps=self.args.max_steps, save_steps=self.args.max_steps // 2, ) if returncode != 0: raise ValueError(f"Could not add the model to the cache. Full log:\n{stdout}.") class SynchronizeRepoCommand(BaseOptimumCLICommand): @staticmethod def parse_args(parser: "ArgumentParser"): parser.add_argument("--repo_id", type=str, default=None, help="The name of the repo to use as remote cache.") parser.add_argument( "--cache_dir", type=str, default=None, help="The cache directory that contains the compilation files." ) @requires_torch_neuronx def run(self): synchronize_hub_cache(cache_path=self.args.cache_dir, cache_repo_id=self.args.repo_id) class LookupRepoCommand(BaseOptimumCLICommand): @staticmethod def parse_args(parser: "ArgumentParser"): parser.add_argument( "model_id", type=str, help="The model_id to lookup cached versions for.", ) parser.add_argument( "--task", type=str, default=None, help="The optional task to lookup cached versions for models supporting multiple tasks.", ) parser.add_argument("--repo_id", type=str, default=None, help="The name of the repo to use as remote cache.") def _list_entries(self): entries = get_hub_cached_entries(self.args.model_id, task=self.args.task, cache_repo_id=self.args.repo_id) n_entries = len(entries) output = f"\n*** {n_entries} entrie(s) found in cache for {self.args.model_id}.***\n\n" for entry in entries: for key, value in entry.items(): output += f"\n{key}: {value}" output += "\n" print(output) def run(self): self._list_entries() class CustomCacheRepoCommand(BaseOptimumCLICommand): SUBCOMMANDS = ( CommandInfo( name="create", help="Create a model repo on the Hugging Face Hub to store Neuron X compilation files.", subcommand_class=CreateCustomCacheRepoCommand, ), CommandInfo( name="set", help="Set the name of the Neuron cache repo to use locally.", subcommand_class=SetCustomCacheRepoCommand, ), CommandInfo( name="add", help="Add a model to the cache of your choice (trainium only).", subcommand_class=AddToCacheRepoCommand, ), CommandInfo( name="synchronize", help="Synchronize the neuronx compiler cache with a hub cache repo.", subcommand_class=SynchronizeRepoCommand, ), CommandInfo( name="lookup", help="Lookup the neuronx compiler hub cache for the specified model id.", subcommand_class=LookupRepoCommand, ), )