optimum/neuron/utils/argument_utils.py (228 lines of code) (raw):

# coding=utf-8 # Copyright 2023 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Utilities related to CLI arguments.""" import os from dataclasses import asdict, dataclass, fields, is_dataclass from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from ...utils import logging if TYPE_CHECKING: from transformers import PretrainedConfig logger = logging.get_logger() DISABLE_ARGUMENT_PATCH = os.environ.get("OPTIMUM_DISABLE_ARGUMENT_PATCH", "0") DISABLE_STRICT_MODE = os.environ.get("OPTIMUM_DISABLE_STRICT_MODE", "0") @dataclass class LoRAAdapterArguments: model_ids: Optional[Union[str, List[str]]] = None weight_names: Optional[Union[str, List[str]]] = None adapter_names: Optional[Union[str, List[str]]] = None scales: Optional[Union[float, List[float]]] = None def __post_init__(self): if isinstance(self.model_ids, str): self.model_ids = [ self.model_ids, ] if isinstance(self.weight_names, str): self.weight_names = [ self.weight_names, ] if isinstance(self.adapter_names, str): self.adapter_names = [ self.adapter_names, ] if isinstance(self.scales, float): self.scales = [ self.scales, ] @dataclass class IPAdapterArguments: model_id: Optional[Union[str, List[str]]] = None subfolder: Optional[Union[str, List[str]]] = None weight_name: Optional[Union[str, List[str]]] = None scale: Optional[Union[float, List[float]]] = None @dataclass class ImageEncoderArguments: sequence_length: Optional[int] = None hidden_size: Optional[int] = None projection_dim: Optional[int] = None @dataclass class InputShapesArguments: batch_size: Optional[int] = None text_batch_size: Optional[int] = None image_batch_size: Optional[int] = None sequence_length: Optional[int] = None num_choices: Optional[int] = None width: Optional[int] = None height: Optional[int] = None image_size: Optional[int] = None num_images_per_prompt: Optional[int] = None patch_size: Optional[int] = None num_channels: Optional[int] = None feature_size: Optional[int] = None nb_max_frames: Optional[int] = None audio_sequence_length: Optional[int] = None point_batch_size: Optional[int] = None nb_points_per_image: Optional[int] = None num_beams: Optional[int] = None vae_scale_factor: Optional[int] = None encoder_hidden_size: Optional[int] = None image_encoder_shapes: Optional[ImageEncoderArguments] = None class DataclassParser: def __init__(self, **kwargs): for name, cls in self.__class__.__annotations__.items(): if is_dataclass(cls): parsed_kwargs = {k: v for k, v in kwargs.items() if k in {f.name for f in fields(cls)}} setattr(self, f"{name}", cls(**parsed_kwargs)) class NeuronArgumentParser(DataclassParser): input_shapes: InputShapesArguments def __init__(self, **kwargs): super().__init__(**kwargs) for name, value in kwargs.items(): if value is not None: setattr(self, name, value) @property def lora_args(self): _lora_args = LoRAAdapterArguments( model_ids=getattr(self, "lora_model_ids", None), weight_names=getattr(self, "lora_weight_names", None), adapter_names=getattr(self, "lora_adapter_names", None), scales=getattr(self, "lora_scales", None), ) return _lora_args @property def ip_adapter_args(self): _ip_adapter_args = IPAdapterArguments( model_id=getattr(self, "ip_adapter_id", None), subfolder=getattr(self, "ip_adapter_subfolder", None), weight_name=getattr(self, "ip_adapter_weight_name", None), scale=getattr(self, "ip_adapter_scale", None), ) return _ip_adapter_args def validate_arg( args, arg_name: str, error_msg: str, validation_function: Optional[Callable[[Any], bool]] = None, expected_value: Optional[Any] = None, ): """ Checks that the argument called `arg_name` in `args` has a value matching what is expected for AWS Trainium to work well. By default it will patch invalid argument values if the environment variable `OPTIMUM_DISABLE_ARGUMENT_PATCH` is left to `"0"` (by default) and an expected value is provided. Args: arg_name (`str`): The name of the argument to check. error_msg (`str`): The error message to show if the argument does not have a proper value. validation_function (`Optional[Callable[[Any], bool]]`, defaults to `None`): A function taking an argument as input, and returning whether the argument is valid or not. expected_value (`Optional[Any]`, defaults to `None`): The expected value for the argument: - If the environment variable `OPTIMUM_DISABLE_ARGUMENT_PATCH="0"` and the original argument value invalid, the argument will be set to this value. - If `validation_function` is left unspecified, it will be set to be the following validation function: ```python def validation_function(arg): return arg == expected_value ``` """ if not hasattr(args, arg_name): return if expected_value is None and validation_function is None: raise ValueError( "At least an expected value or a validation_function must be provided, but none was provided here." ) elif validation_function is None and expected_value is not None: def expected_validation_function(arg): return arg == expected_value validation_function = expected_validation_function arg = getattr(args, arg_name) if not validation_function(arg): if DISABLE_ARGUMENT_PATCH == "0" and expected_value is not None: patching_msg = ( f"Setting {arg_name} to {expected_value}. To disable automatic argument patching set the " f"environment variable OPTIMUM_DISABLE_ARGUMENT_PATCH to 1." ) logger.warning(f"{error_msg}\n{patching_msg}") setattr(args, arg_name, expected_value) elif DISABLE_STRICT_MODE == "1": logger.warning(error_msg) else: raise_error_msg = ( "Aborting training. To disable automatic failure when an argument value is inferred to be wrong for " "Trainium, set the environment variable OPTIMUM_DISABLE_STRICT_MODE to 1." ) raise ValueError(f"{error_msg}\n{raise_error_msg}") def convert_neuronx_compiler_args_to_neuron( auto_cast: Optional[str], auto_cast_type: str, disable_fast_relayout: bool, ): """ Builds `compiler_args` for neuron compiler. """ compiler_args = [] if auto_cast is None: auto_cast = "none" elif auto_cast == "matmul": auto_cast = "matmult" if auto_cast == "none": compiler_args.extend(["--fast-math", auto_cast]) elif auto_cast == "all": if auto_cast_type == "mixed": raise ValueError( f"For auto_cast={auto_cast}, cannot set auto_cast_type={auto_cast_type}. " "Please choose among `bf16`, `fp16` and `tf32`." ) elif auto_cast_type != "bf16": compiler_args.extend(["--fast-math", f"fp32-cast-all-{auto_cast_type}"]) else: compiler_args.extend(["--fast-math", auto_cast]) elif auto_cast == "matmult": if auto_cast_type == "mixed": compiler_args.extend(["--fast-math", "fp32-cast-matmult"]) else: compiler_args.extend(["--fast-math", f"fp32-cast-matmult-{auto_cast_type}"]) else: raise ValueError( f"The auto_cast value {auto_cast} is not valid. Please use one of the following: None, all or matmul." ) if disable_fast_relayout is True: compiler_args.append("no-fast-relayout") return compiler_args def add_shapes_to_config(config_args, input_shapes: Dict[str, Any]): for axis, shape in input_shapes.items(): if shape is not None: if is_dataclass(shape): shape_dict = asdict(shape) config_args[axis] = shape_dict else: axis = f"static_{axis}" config_args[axis] = shape return config_args def store_compilation_config( config: Union["PretrainedConfig", Dict], input_shapes: Dict[str, int], compiler_kwargs: Dict[str, Any], dynamic_batch_size: bool, compiler_type: str, compiler_version: str, inline_weights_to_neff: bool, optlevel: str, tensor_parallel_size: int = 1, model_type: Optional[str] = None, task: Optional[str] = None, input_names: Optional[List[str]] = None, output_names: Optional[List[str]] = None, output_attentions: bool = False, output_hidden_states: bool = False, **kwargs, ): if isinstance(config, Dict): update_func = config.__setitem__ else: update_func = config.__setattr__ config_args = {} # Add neuron version to the config, so it can be checked at load time config_args["compiler_type"] = compiler_type config_args["compiler_version"] = compiler_version config_args["inline_weights_to_neff"] = inline_weights_to_neff # Add input shapes during compilation to the config config_args = add_shapes_to_config(config_args, input_shapes) config_args["dynamic_batch_size"] = dynamic_batch_size config_args["tensor_parallel_size"] = tensor_parallel_size # Add compilation args to the config config_args["optlevel"] = optlevel for arg, value in compiler_kwargs.items(): config_args[arg] = value config_args["input_names"] = input_names config_args["output_names"] = output_names original_model_type = getattr(config, "export_model_type", None) or getattr( config, "model_type", None ) # prioritize sentence_transformers to transformers neuron_model_type = str(model_type).replace("_", "-") if model_type is not None else model_type if original_model_type is None: update_func( "model_type", neuron_model_type ) # Add model_type to the config if it doesn't exist before, eg. submodel of Stable Diffusion. else: config_args["model_type"] = ( neuron_model_type or original_model_type ) # Prioritize Neuron custom model_type, eg. `t5-encoder`. # Add args of optional outputs config_args["output_attentions"] = output_attentions config_args["output_hidden_states"] = output_hidden_states config_args["task"] = task update_func("neuron", config_args) if hasattr(config, "_diffusers_version"): import diffusers update_func("_diffusers_version", diffusers.__version__) return config