optimum/neuron/modeling_diffusion.py (1,260 lines of code) (raw):
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""NeuronDiffusionPipelineBase class for inference of diffusion models on neuron devices."""
import copy
import importlib
import inspect
import logging
import os
import shutil
from abc import abstractmethod
from collections import OrderedDict
from dataclasses import asdict
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Union
import torch
from huggingface_hub import snapshot_download
from torch.nn import ModuleList
from transformers import CLIPFeatureExtractor, CLIPTokenizer, PretrainedConfig, T5Tokenizer
from transformers.modeling_outputs import ModelOutput
from ..exporters.neuron import (
load_models_and_neuron_configs,
main_export,
normalize_stable_diffusion_input_shapes,
replace_stable_diffusion_submodels,
)
from ..exporters.neuron.model_configs import * # noqa: F403
from ..exporters.tasks import TasksManager
from ..utils import is_diffusers_available
from .cache.entries.multi_model import MultiModelCacheEntry
from .cache.hub_cache import create_hub_compile_cache_proxy
from .modeling_traced import NeuronTracedModel
from .utils import (
DIFFUSION_MODEL_CONTROLNET_NAME,
DIFFUSION_MODEL_IMAGE_ENCODER_NAME,
DIFFUSION_MODEL_TEXT_ENCODER_2_NAME,
DIFFUSION_MODEL_TEXT_ENCODER_NAME,
DIFFUSION_MODEL_TRANSFORMER_NAME,
DIFFUSION_MODEL_UNET_NAME,
DIFFUSION_MODEL_VAE_DECODER_NAME,
DIFFUSION_MODEL_VAE_ENCODER_NAME,
NEURON_FILE_NAME,
DiffusersPretrainedConfig,
NeuronArgumentParser,
check_if_weights_replacable,
is_neuronx_available,
replace_weights,
store_compilation_config,
)
from .utils.require_utils import requires_torch_neuronx
from .utils.version_utils import get_neuronxcc_version
if is_neuronx_available():
import torch_neuronx
NEURON_COMPILER_TYPE = "neuronx-cc"
NEURON_COMPILER_VERSION = get_neuronxcc_version()
if is_diffusers_available():
from diffusers import (
ControlNetModel,
LatentConsistencyModelPipeline,
LCMScheduler,
PixArtAlphaPipeline,
PixArtSigmaPipeline,
StableDiffusionControlNetPipeline,
StableDiffusionImg2ImgPipeline,
StableDiffusionInpaintPipeline,
StableDiffusionInstructPix2PixPipeline,
StableDiffusionPipeline,
StableDiffusionXLControlNetPipeline,
StableDiffusionXLImg2ImgPipeline,
StableDiffusionXLInpaintPipeline,
StableDiffusionXLPipeline,
)
from diffusers.configuration_utils import FrozenDict
from diffusers.image_processor import PixArtImageProcessor, VaeImageProcessor
from diffusers.models.autoencoders.vae import DecoderOutput, DiagonalGaussianDistribution
from diffusers.models.controlnet import ControlNetOutput
from diffusers.models.embeddings import ImageProjection, IPAdapterFullImageProjection
from diffusers.models.modeling_outputs import AutoencoderKLOutput
from diffusers.pipelines.controlnet import MultiControlNetModel
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
from diffusers.schedulers import SchedulerMixin
from diffusers.schedulers.scheduling_utils import SCHEDULER_CONFIG_NAME
from diffusers.utils import CONFIG_NAME
from .pipelines import (
NeuronStableDiffusionControlNetPipelineMixin,
NeuronStableDiffusionXLControlNetPipelineMixin,
NeuronStableDiffusionXLPipelineMixin,
)
os.environ["NEURON_FUSE_SOFTMAX"] = "1"
os.environ["NEURON_CUSTOM_SILU"] = "1"
else:
raise ModuleNotFoundError("`diffusers` python package is not installed.")
if TYPE_CHECKING:
from ..exporters.neuron import NeuronDefaultConfig
logger = logging.getLogger(__name__)
class NeuronDiffusionPipelineBase(NeuronTracedModel):
auto_model_class = DiffusionPipeline
task = None
library_name = "diffusers"
base_model_prefix = "neuron_model"
config_name = "model_index.json"
sub_component_config_name = "config.json"
_optional_components = [
"tokenizer",
"tokenizer_2",
"text_encoder",
"text_encoder_2",
"vae_encoder",
"image_encoder",
"unet",
"transformer",
"feature_extractor",
]
encoder_hid_proj = None # A dummy module of Unet/transformer when they take the outputs of image encoder.
def __init__(
self,
config: Dict[str, Any],
configs: Dict[str, "PretrainedConfig"],
neuron_configs: Dict[str, "NeuronDefaultConfig"],
data_parallel_mode: Literal["none", "unet", "transformer", "all"],
scheduler: Optional[SchedulerMixin],
vae_decoder: Union[torch.jit._script.ScriptModule, "NeuronModelVaeDecoder"],
text_encoder: Optional[Union[torch.jit._script.ScriptModule, "NeuronModelTextEncoder"]] = None,
text_encoder_2: Optional[Union[torch.jit._script.ScriptModule, "NeuronModelTextEncoder"]] = None,
unet: Optional[Union[torch.jit._script.ScriptModule, "NeuronModelUnet"]] = None,
transformer: Optional[Union[torch.jit._script.ScriptModule, "NeuronModelTransformer"]] = None,
vae_encoder: Optional[Union[torch.jit._script.ScriptModule, "NeuronModelVaeEncoder"]] = None,
image_encoder: Optional[torch.jit._script.ScriptModule] = None,
safety_checker: Optional[torch.jit._script.ScriptModule] = None,
tokenizer: Optional[Union[CLIPTokenizer, T5Tokenizer]] = None,
tokenizer_2: Optional[CLIPTokenizer] = None,
feature_extractor: Optional[CLIPFeatureExtractor] = None,
controlnet: Optional[
Union[
torch.jit._script.ScriptModule,
List[torch.jit._script.ScriptModule],
"NeuronControlNetModel",
"NeuronMultiControlNetModel",
]
] = None,
# stable diffusion xl specific arguments
requires_aesthetics_score: bool = False,
force_zeros_for_empty_prompt: bool = True,
add_watermarker: Optional[bool] = None,
model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None,
model_and_config_save_paths: Optional[Dict[str, Tuple[str, Path]]] = None,
):
"""
Args:
config (`Dict[str, Any]`):
A config dictionary from which the model components will be instantiated. Make sure to only load
configuration files of compatible classes.
configs (Dict[str, "PretrainedConfig"], defaults to `None`):
A dictionary configurations for components of the pipeline.
neuron_configs (Dict[str, "NeuronDefaultConfig"], defaults to `None`):
A list of Neuron configurations related to the compilation.
data_parallel_mode (`Literal["none", "unet", "all"]`):
Mode to decide what components to load into both NeuronCores of a Neuron device. Can be "none"(no data parallel), "unet"(only
load unet into both cores of each device), "all"(load the whole pipeline into both cores).
scheduler (`Optional[SchedulerMixin]`):
A scheduler to be used in combination with the U-NET component to denoise the encoded image latents.
vae_decoder (`Union[torch.jit._script.ScriptModule, "NeuronModelVaeDecoder"]`):
The Neuron TorchScript module associated to the VAE decoder.
text_encoder (`Optional[Union[torch.jit._script.ScriptModule, "NeuronModelTextEncoder"]]`, defaults to `None`):
The Neuron TorchScript module associated to the text encoder.
text_encoder_2 (`Optional[Union[torch.jit._script.ScriptModule, "NeuronModelTextEncoder"]]`, defaults to `None`):
The Neuron TorchScript module associated to the second frozen text encoder. Stable Diffusion XL uses the text and pool portion of [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection), specifically the [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k) variant.
unet (`Optional[Union[torch.jit._script.ScriptModule, "NeuronModelUnet"]]`, defaults to `None`):
The Neuron TorchScript module associated to the U-NET.
transformer (`Optional[Union[torch.jit._script.ScriptModule, "NeuronModelTransformer"]]`, defaults to `None`):
The Neuron TorchScript module associated to the diffuser transformer.
vae_encoder (`Optional[Union[torch.jit._script.ScriptModule, "NeuronModelVaeEncoder"]]`, defaults to `None`):
The Neuron TorchScript module associated to the VAE encoder.
image_encoder (`Optional[torch.jit._script.ScriptModule]`, defaults to `None`):
The Neuron TorchScript module associated to the frozen CLIP image-encoder.
safety_checker (`Optional[torch.jit._script.ScriptModule]`, defaults to `None`):
The Neuron TorchScript module associated to the Classification module that estimates whether generated images could be considered offensive or harmful.
tokenizer (`Optional[Union[CLIPTokenizer, T5Tokenizer]]`, defaults to `None`):
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer) for stable diffusion models,
or tokenizer of class [T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer) for diffusion transformers.
tokenizer_2 (`Optional[CLIPTokenizer]`, defaults to `None`):
Second tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
feature_extractor (`Optional[CLIPFeatureExtractor]`, defaults to `None`):
A model extracting features from generated images to be used as inputs for the `safety_checker`
controlnet (`Optional[Union[torch.jit._script.ScriptModule, List[torch.jit._script.ScriptModule], "NeuronControlNetModel", "NeuronMultiControlNetModel"]]`, defaults to `None`):
The Neuron TorchScript module(s) associated to the ControlNet(s).
requires_aesthetics_score (`bool`, defaults to `False`):
Whether the `unet` requires an `aesthetic_score` condition to be passed during inference. Also see the
config of `stabilityai/stable-diffusion-xl-refiner-1-0`.
force_zeros_for_empty_prompt (`bool`, defaults to `True`):
Whether the negative prompt embeddings shall be forced to always be set to 0. Also see the config of
`stabilityai/stable-diffusion-xl-base-1-0`.
add_watermarker (`Optional[bool]`, defaults to `None`):
Whether to use the [invisible_watermark library](https://github.com/ShieldMnt/invisible-watermark/) to
watermark output images. If not defined, it will default to True if the package is installed, otherwise no
watermarker will be used.
model_save_dir (`Optional[Union[str, Path, TemporaryDirectory]]`, defaults to `None`):
The directory under which the exported Neuron models were saved.
model_and_config_save_paths (`Optional[Dict[str, Tuple[str, Path]]]`, defaults to `None`):
The paths where exported Neuron models were saved.
"""
# configurations
self._internal_dict = config
self.data_parallel_mode = data_parallel_mode
self.configs = configs
self.neuron_configs = neuron_configs
self.dynamic_batch_size = all(
neuron_config._config.neuron["dynamic_batch_size"] for neuron_config in self.neuron_configs.values()
)
# pipeline components
self.text_encoder = (
NeuronModelTextEncoder(
text_encoder,
self,
self.configs[DIFFUSION_MODEL_TEXT_ENCODER_NAME],
self.neuron_configs[DIFFUSION_MODEL_TEXT_ENCODER_NAME],
)
if text_encoder is not None and not isinstance(text_encoder, NeuronModelTextEncoder)
else text_encoder
)
self.text_encoder_2 = (
NeuronModelTextEncoder(
text_encoder_2,
self,
self.configs[DIFFUSION_MODEL_TEXT_ENCODER_2_NAME],
self.neuron_configs[DIFFUSION_MODEL_TEXT_ENCODER_2_NAME],
)
if text_encoder_2 is not None and not isinstance(text_encoder_2, NeuronModelTextEncoder)
else text_encoder_2
)
self.unet = (
NeuronModelUnet(
unet, self, self.configs[DIFFUSION_MODEL_UNET_NAME], self.neuron_configs[DIFFUSION_MODEL_UNET_NAME]
)
if unet is not None and not isinstance(unet, NeuronModelUnet)
else unet
)
self.transformer = (
NeuronModelTransformer(
transformer,
self,
self.configs[DIFFUSION_MODEL_TRANSFORMER_NAME],
self.neuron_configs[DIFFUSION_MODEL_TRANSFORMER_NAME],
)
if transformer is not None and not isinstance(transformer, NeuronModelTransformer)
else transformer
)
self.vae_encoder = (
NeuronModelVaeEncoder(
vae_encoder,
self,
self.configs[DIFFUSION_MODEL_VAE_ENCODER_NAME],
self.neuron_configs[DIFFUSION_MODEL_VAE_ENCODER_NAME],
)
if vae_encoder is not None and not isinstance(vae_encoder, NeuronModelVaeEncoder)
else vae_encoder
)
self.vae_decoder = (
NeuronModelVaeDecoder(
vae_decoder,
self,
self.configs[DIFFUSION_MODEL_VAE_DECODER_NAME],
self.neuron_configs[DIFFUSION_MODEL_VAE_DECODER_NAME],
)
if vae_decoder is not None and not isinstance(vae_decoder, NeuronModelVaeDecoder)
else vae_decoder
)
self.vae = NeuronModelVae(self.vae_encoder, self.vae_decoder)
if (
controlnet
and not isinstance(controlnet, NeuronControlNetModel)
and not isinstance(controlnet, NeuronMultiControlNetModel)
):
controlnet_cls = (
NeuronMultiControlNetModel
if isinstance(controlnet, list) and len(controlnet) > 1
else NeuronControlNetModel
)
self.controlnet = controlnet_cls(
controlnet,
self,
self.configs[DIFFUSION_MODEL_CONTROLNET_NAME],
self.neuron_configs[DIFFUSION_MODEL_CONTROLNET_NAME],
)
else:
self.controlnet = controlnet
self.tokenizer = tokenizer
self.tokenizer_2 = tokenizer_2
self.scheduler = scheduler
# change lcm scheduler which extends the denoising procedure
self.is_lcm = False
if self.unet and NeuronDiffusionPipelineBase.is_lcm(self.unet.config):
self.is_lcm = True
self.scheduler = LCMScheduler.from_config(self.scheduler.config)
self.feature_extractor = feature_extractor
self.image_encoder = (
NeuronModelImageEncoder(
image_encoder,
self,
self.configs[DIFFUSION_MODEL_IMAGE_ENCODER_NAME],
self.neuron_configs[DIFFUSION_MODEL_IMAGE_ENCODER_NAME],
)
if image_encoder is not None and not isinstance(image_encoder, NeuronModelImageEncoder)
else image_encoder
)
self.safety_checker = safety_checker # TODO: implement the class `NeuronStableDiffusionSafetyChecker`.
all_possible_init_args = {
"vae": self.vae,
"unet": self.unet,
"transformer": self.transformer,
"text_encoder": self.text_encoder,
"text_encoder_2": self.text_encoder_2,
"controlnet": self.controlnet,
"image_encoder": self.image_encoder,
"safety_checker": self.safety_checker,
"scheduler": self.scheduler,
"tokenizer": self.tokenizer,
"tokenizer_2": self.tokenizer_2,
"feature_extractor": self.feature_extractor,
"requires_aesthetics_score": requires_aesthetics_score,
"force_zeros_for_empty_prompt": force_zeros_for_empty_prompt,
"add_watermarker": add_watermarker,
}
diffusers_pipeline_args = {}
for key in inspect.signature(self.auto_model_class).parameters.keys():
if key in all_possible_init_args:
diffusers_pipeline_args[key] = all_possible_init_args[key]
self.auto_model_class.__init__(self, **diffusers_pipeline_args)
self._attributes_init(model_save_dir)
self.model_and_config_save_paths = model_and_config_save_paths if model_and_config_save_paths else None
self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
self.register_to_config(requires_aesthetics_score=requires_aesthetics_score)
# Calculate static shapes
if hasattr(self.vae.config, "block_out_channels"):
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
else:
self.vae_scale_factor = 8
unet_or_transformer = "transformer" if self.transformer else "unet"
unet_or_transformer_batch_size = self.neuron_configs[unet_or_transformer].batch_size
if "text_encoder" in self.neuron_configs:
text_encoder_batch_size = self.neuron_configs["text_encoder"].batch_size
self.num_images_per_prompt = unet_or_transformer_batch_size // text_encoder_batch_size
elif "text_encoder_2" in self.neuron_configs:
text_encoder_batch_size = self.neuron_configs["text_encoder_2"].batch_size
self.num_images_per_prompt = unet_or_transformer_batch_size // text_encoder_batch_size
else:
self.num_images_per_prompt = 1
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
self.control_image_processor = VaeImageProcessor(
vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
)
# create dummy objects for inference with ip adapters
self._maybe_create_dummy_image_proj_layers()
@staticmethod
def is_lcm(unet_config):
patterns = ["lcm", "latent-consistency"]
unet_name_or_path = getattr(unet_config, "_name_or_path", "").lower()
return any(pattern in unet_name_or_path for pattern in patterns)
@staticmethod
@requires_torch_neuronx
def load_model(
data_parallel_mode: Optional[Literal["none", "unet", "transformer", "all"]],
text_encoder_path: Optional[Union[str, Path]] = None,
text_encoder_2_path: Optional[Union[str, Path]] = None,
image_encoder_path: Optional[Union[str, Path]] = None,
unet_path: Optional[Union[str, Path]] = None,
transformer_path: Optional[Union[str, Path]] = None,
vae_encoder_path: Optional[Union[str, Path]] = None,
vae_decoder_path: Optional[Union[str, Path]] = None,
controlnet_paths: Optional[List[Path]] = None,
dynamic_batch_size: bool = False,
to_neuron: bool = False,
):
"""
Loads Stable Diffusion TorchScript modules compiled by neuron(x)-cc compiler. It will be first loaded onto CPU and then moved to
one or multiple [NeuronCore](https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/arch/neuron-hardware/neuroncores-arch.html).
Args:
data_parallel_mode (`Optional[Literal["none", "unet", "all"]]`):
Mode to decide what components to load into both NeuronCores of a Neuron device. Can be "none"(no data parallel), "unet"(only
load unet into both cores of each device), "all"(load the whole pipeline into both cores).
text_encoder_path (`Union[str, Path]`, defaults to `None`):
Path of the compiled text encoder.
text_encoder_2_path (`Optional[Union[str, Path]]`, defaults to `None`):
Path of the compiled second frozen text encoder. SDXL only.
image_encoder_path (`Optional[Union[str, Path]]`, defaults to `None`):
Path of the compiled image encoder.
unet_path (`Optional[Union[str, Path]]`, defaults to `None`):
Path of the compiled U-NET.
transformer_path (`Optional[Union[str, Path]]`, defaults to `None`):
Path of the compiled diffusion transformer.
vae_encoder_path (`Optional[Union[str, Path]]`, defaults to `None`):
Path of the compiled VAE encoder. It is optional, only used for tasks taking images as input.
vae_decoder_path (`Optional[Union[str, Path]]`, defaults to `None`):
Path of the compiled VAE decoder.
controlnet_paths (`Optional[List[Path]]`, defaults to `None`):
Path of the compiled controlnets.
dynamic_batch_size (`bool`, defaults to `False`):
Whether enable dynamic batch size for neuron compiled model. If `True`, the input batch size can be a multiple of the batch size during the compilation.
to_neuron (`bool`, defaults to `False`):
Whether to move manually the traced model to NeuronCore. It's only needed when `inline_weights_to_neff=False`, otherwise it is loaded automatically to a Neuron device.
"""
submodels = {
# Load the UNet/Diffusion transformer first to avoid CPU OOM
"unet": unet_path,
"transformer": transformer_path,
"text_encoder": text_encoder_path,
"text_encoder_2": text_encoder_2_path,
"vae_encoder": vae_encoder_path,
"vae_decoder": vae_decoder_path,
"controlnet": controlnet_paths,
"image_encoder": image_encoder_path,
}
def _load_models_to_neuron(submodels, models_on_both_cores=None, models_on_a_single_core=None):
# loading models to both cores, eg. unet, transformer.
if models_on_both_cores:
for model_name in models_on_both_cores:
submodel_paths = submodels[model_name]
# for the case of multiple controlnets the path could be a list
if not isinstance(submodel_paths, list):
submodel_paths = [submodel_paths]
submodels_list = []
for submodel_path in submodel_paths:
if submodel_path is not None and submodel_path.is_file():
submodel = NeuronTracedModel.load_model(
submodel_path, to_neuron=False
) # No need to load to neuron manually when dp
submodel = torch_neuronx.DataParallel(
submodel,
[0, 1],
set_dynamic_batching=dynamic_batch_size,
)
submodels_list.append(submodel)
if submodels_list:
submodels[model_name] = submodels_list if len(submodels_list) > 1 else submodels_list[0]
else:
submodels[model_name] = None
# loading models to a single core, eg. text encoders, vae.
if models_on_a_single_core:
for model_name in models_on_a_single_core:
submodel_paths = submodels[model_name]
# for the case of multiple controlnets the path could be a list
if not isinstance(submodel_paths, list):
submodel_paths = [submodel_paths]
submodels_list = []
for submodel_path in submodel_paths:
if submodel_path is not None and submodel_path.is_file():
submodel = NeuronTracedModel.load_model(submodel_path, to_neuron=to_neuron)
submodels_list.append(submodel)
if submodels_list:
submodels[model_name] = submodels_list if len(submodels_list) > 1 else submodels_list[0]
else:
submodels[model_name] = None
return submodels
if data_parallel_mode == "all":
logger.info("Loading the whole pipeline into both Neuron Cores...")
submodels = _load_models_to_neuron(submodels=submodels, models_on_both_cores=list(submodels))
elif data_parallel_mode == "unet":
logger.info("Loading only U-Net into both Neuron Cores...")
models_on_a_single_core = list(submodels)
models_on_a_single_core.remove("unet")
models_on_a_single_core.remove(
"controlnet"
) # controlnet takes inputs with the same batch_size as the unet
submodels = _load_models_to_neuron(
submodels=submodels,
models_on_both_cores=["unet", "controlnet"],
models_on_a_single_core=models_on_a_single_core,
)
elif data_parallel_mode == "transformer":
logger.info("Loading only diffusion transformer into both Neuron Cores...")
models_on_a_single_core = list(submodels)
models_on_a_single_core.remove("transformer")
models_on_a_single_core.remove(
"controlnet"
) # controlnet takes inputs with the same batch_size as the transformer
submodels = _load_models_to_neuron(
submodels=submodels,
models_on_both_cores=["transformer", "controlnet"],
models_on_a_single_core=models_on_a_single_core,
)
elif data_parallel_mode == "none":
logger.info("Loading the pipeline without any data parallelism...")
submodels = _load_models_to_neuron(submodels=submodels, models_on_a_single_core=list(submodels))
else:
raise ValueError("You need to pass `data_parallel_mode` to define Neuron Core allocation.")
return submodels
def replace_weights(self, weights: Optional[Union[Dict[str, torch.Tensor], torch.nn.Module]] = None):
check_if_weights_replacable(self.configs, weights)
model_names = [
"text_encoder",
"text_encoder_2",
"unet",
"transformer",
"vae_decoder",
"vae_encoder",
"image_encoder",
]
for name in model_names:
model = getattr(self, name, None)
weight = getattr(weights, name, None)
if model is not None and weight is not None:
model = replace_weights(model.model, weight)
@staticmethod
def set_default_dp_mode(configs: Dict):
if "unet" in configs:
unet_config = configs["unet"]
if NeuronDiffusionPipelineBase.is_lcm(unet_config) is True:
# LCM applies guidance using guidance embeddings, so we can load the whole pipeline into both cores.
return "all"
else:
# Load U-Net into both cores for classifier-free guidance which doubles batch size of inputs passed to the U-Net.
return "unet"
elif "transformer" in configs:
return "transformer"
else:
logger.warning(
"There is no unet nor transformer in your pipeline, the data parallelism will be disabled, make sure that you are loading the model correctly!"
)
return "none"
def _save_pretrained(
self,
save_directory: Union[str, Path],
text_encoder_file_name: str = NEURON_FILE_NAME,
text_encoder_2_file_name: str = NEURON_FILE_NAME,
unet_file_name: str = NEURON_FILE_NAME,
transformer_file_name: str = NEURON_FILE_NAME,
vae_encoder_file_name: str = NEURON_FILE_NAME,
vae_decoder_file_name: str = NEURON_FILE_NAME,
controlnet_file_name: str = NEURON_FILE_NAME,
image_encoder_file_name: str = NEURON_FILE_NAME,
):
"""
Saves the model to the serialized format optimized for Neuron devices.
"""
if self.model_and_config_save_paths is None:
logger.warning(
"`model_save_paths` is None which means that no path of Neuron model is defined. Nothing will be saved."
)
return
save_directory = Path(save_directory)
def _remove_submodel_if_non_exist(model_names):
for model_name in model_names:
if not self.model_and_config_save_paths.get(model_name)[0].is_file():
self.model_and_config_save_paths.pop(model_name)
_remove_submodel_if_non_exist(
[
DIFFUSION_MODEL_TEXT_ENCODER_NAME,
DIFFUSION_MODEL_TEXT_ENCODER_2_NAME,
DIFFUSION_MODEL_UNET_NAME,
DIFFUSION_MODEL_TRANSFORMER_NAME,
DIFFUSION_MODEL_VAE_ENCODER_NAME,
DIFFUSION_MODEL_IMAGE_ENCODER_NAME,
]
)
if not self.model_and_config_save_paths.get(DIFFUSION_MODEL_CONTROLNET_NAME)[0]:
self.model_and_config_save_paths.pop(DIFFUSION_MODEL_CONTROLNET_NAME)
num_controlnet = 0
else:
num_controlnet = len(self.model_and_config_save_paths.get(DIFFUSION_MODEL_CONTROLNET_NAME)[0])
logger.info(f"Saving the {tuple(self.model_and_config_save_paths.keys())}...")
dst_paths = {
DIFFUSION_MODEL_TEXT_ENCODER_NAME: save_directory
/ DIFFUSION_MODEL_TEXT_ENCODER_NAME
/ text_encoder_file_name,
DIFFUSION_MODEL_TEXT_ENCODER_2_NAME: save_directory
/ DIFFUSION_MODEL_TEXT_ENCODER_2_NAME
/ text_encoder_2_file_name,
DIFFUSION_MODEL_UNET_NAME: save_directory / DIFFUSION_MODEL_UNET_NAME / unet_file_name,
DIFFUSION_MODEL_TRANSFORMER_NAME: save_directory
/ DIFFUSION_MODEL_TRANSFORMER_NAME
/ transformer_file_name,
DIFFUSION_MODEL_VAE_ENCODER_NAME: save_directory
/ DIFFUSION_MODEL_VAE_ENCODER_NAME
/ vae_encoder_file_name,
DIFFUSION_MODEL_VAE_DECODER_NAME: save_directory
/ DIFFUSION_MODEL_VAE_DECODER_NAME
/ vae_decoder_file_name,
DIFFUSION_MODEL_IMAGE_ENCODER_NAME: save_directory
/ DIFFUSION_MODEL_IMAGE_ENCODER_NAME
/ image_encoder_file_name,
}
dst_paths[DIFFUSION_MODEL_CONTROLNET_NAME] = [
save_directory / (DIFFUSION_MODEL_CONTROLNET_NAME + f"_{str(idx)}") / controlnet_file_name
for idx in range(num_controlnet)
]
src_paths_list = []
dst_paths_list = []
for model_name in set(self.model_and_config_save_paths.keys()).intersection(dst_paths.keys()):
model_src_path = self.model_and_config_save_paths[model_name][0]
if isinstance(model_src_path, list):
# neuron model
src_paths_list += model_src_path
dst_paths_list += dst_paths[model_name]
# config
src_paths_list += self.model_and_config_save_paths[model_name][1]
dst_paths_list += [model_path.parent / CONFIG_NAME for model_path in dst_paths[model_name]]
else:
# neuron model
src_paths_list.append(model_src_path)
dst_paths_list.append(dst_paths[model_name])
# config
src_paths_list.append(self.model_and_config_save_paths[model_name][1])
dst_paths_list.append(dst_paths[model_name].parent / CONFIG_NAME)
for src_path, dst_path in zip(src_paths_list, dst_paths_list):
dst_path.parent.mkdir(parents=True, exist_ok=True)
if src_path.is_file():
shutil.copyfile(src_path, dst_path)
if self.tokenizer is not None:
self.tokenizer.save_pretrained(save_directory.joinpath("tokenizer"))
if self.tokenizer_2 is not None:
self.tokenizer_2.save_pretrained(save_directory.joinpath("tokenizer_2"))
self.scheduler.save_pretrained(save_directory.joinpath("scheduler"))
if self.feature_extractor is not None:
self.feature_extractor.save_pretrained(save_directory.joinpath("feature_extractor"))
@classmethod
@requires_torch_neuronx
def _from_pretrained(
cls,
model_id: Union[str, Path],
config: Dict[str, Any],
token: Optional[Union[bool, str]] = None,
revision: Optional[str] = None,
force_download: bool = False,
cache_dir: Optional[str] = None,
text_encoder_file_name: Optional[str] = NEURON_FILE_NAME,
text_encoder_2_file_name: Optional[str] = NEURON_FILE_NAME,
unet_file_name: Optional[str] = NEURON_FILE_NAME,
transformer_file_name: Optional[str] = NEURON_FILE_NAME,
vae_encoder_file_name: Optional[str] = NEURON_FILE_NAME,
vae_decoder_file_name: Optional[str] = NEURON_FILE_NAME,
controlnet_file_name: Optional[str] = NEURON_FILE_NAME,
image_encoder_file_name: Optional[str] = NEURON_FILE_NAME,
local_files_only: bool = False,
model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None,
data_parallel_mode: Optional[Literal["none", "unet", "transformer", "all"]] = None,
**kwargs, # To share kwargs only available for `_from_transformers`
):
model_id = str(model_id)
patterns = set(config.keys())
processors_to_load = patterns.intersection({"feature_extractor", "tokenizer", "tokenizer_2", "scheduler"})
if not os.path.isdir(model_id):
patterns.update({DIFFUSION_MODEL_VAE_ENCODER_NAME, DIFFUSION_MODEL_VAE_DECODER_NAME})
allow_patterns = {os.path.join(k, "*") for k in patterns if not k.startswith("_")}
allow_patterns.update(
{
text_encoder_file_name,
text_encoder_2_file_name,
unet_file_name,
transformer_file_name,
vae_encoder_file_name,
vae_decoder_file_name,
controlnet_file_name,
image_encoder_file_name,
SCHEDULER_CONFIG_NAME,
CONFIG_NAME,
cls.config_name,
}
)
# Downloads all repo's files matching the allowed patterns
model_id = snapshot_download(
model_id,
cache_dir=cache_dir,
local_files_only=local_files_only,
token=token,
revision=revision,
force_download=force_download,
allow_patterns=allow_patterns,
ignore_patterns=["*.msgpack", "*.safetensors", "*.bin"],
)
new_model_save_dir = Path(model_id)
sub_models = {}
for name in processors_to_load:
library_name, library_classes = config[name]
if library_classes is not None:
library = importlib.import_module(library_name)
class_obj = getattr(library, library_classes)
load_method = getattr(class_obj, "from_pretrained")
# Check if the module is in a subdirectory
if (new_model_save_dir / name).is_dir():
sub_models[name] = load_method(new_model_save_dir / name)
else:
sub_models[name] = load_method(new_model_save_dir)
model_and_config_save_paths = {
"text_encoder": (
new_model_save_dir / DIFFUSION_MODEL_TEXT_ENCODER_NAME / text_encoder_file_name,
new_model_save_dir / DIFFUSION_MODEL_TEXT_ENCODER_NAME / cls.sub_component_config_name,
),
"text_encoder_2": (
new_model_save_dir / DIFFUSION_MODEL_TEXT_ENCODER_2_NAME / text_encoder_2_file_name,
new_model_save_dir / DIFFUSION_MODEL_TEXT_ENCODER_2_NAME / cls.sub_component_config_name,
),
"image_encoder": (
new_model_save_dir / DIFFUSION_MODEL_IMAGE_ENCODER_NAME / image_encoder_file_name,
new_model_save_dir / DIFFUSION_MODEL_IMAGE_ENCODER_NAME / cls.sub_component_config_name,
),
"unet": (
new_model_save_dir / DIFFUSION_MODEL_UNET_NAME / unet_file_name,
new_model_save_dir / DIFFUSION_MODEL_UNET_NAME / cls.sub_component_config_name,
),
"transformer": (
new_model_save_dir / DIFFUSION_MODEL_TRANSFORMER_NAME / transformer_file_name,
new_model_save_dir / DIFFUSION_MODEL_TRANSFORMER_NAME / cls.sub_component_config_name,
),
"vae_encoder": (
new_model_save_dir / DIFFUSION_MODEL_VAE_ENCODER_NAME / vae_encoder_file_name,
new_model_save_dir / DIFFUSION_MODEL_VAE_ENCODER_NAME / cls.sub_component_config_name,
),
"vae_decoder": (
new_model_save_dir / DIFFUSION_MODEL_VAE_DECODER_NAME / vae_decoder_file_name,
new_model_save_dir / DIFFUSION_MODEL_VAE_DECODER_NAME / cls.sub_component_config_name,
),
}
# Add ControlNet paths
controlnet_model_paths = []
controlnet_config_paths = []
for path in new_model_save_dir.iterdir():
if path.is_dir() and path.name.startswith("controlnet"):
controlnet_model_paths.append(path / controlnet_file_name)
controlnet_config_paths.append(path / cls.sub_component_config_name)
model_and_config_save_paths["controlnet"] = (controlnet_model_paths, controlnet_config_paths)
# Re-build pretrained configs and neuron configs
configs, neuron_configs = {}, {}
inline_weights_to_neff = True
for name, (_, config_paths) in model_and_config_save_paths.items():
if not isinstance(config_paths, list):
config_paths = [config_paths]
sub_model_configs = []
sub_neuron_configs = []
for config_path in config_paths:
if config_path.is_file():
model_config = DiffusersPretrainedConfig.from_json_file(config_path)
neuron_config = cls._neuron_config_init(model_config)
inline_weights_to_neff = inline_weights_to_neff and neuron_config._config.neuron.get(
"inline_weights_to_neff", True
)
sub_model_configs.append(model_config)
sub_neuron_configs.append(neuron_config)
if sub_model_configs and sub_neuron_configs:
configs[name] = sub_model_configs if len(sub_model_configs) > 1 else sub_model_configs[0]
neuron_configs[name] = sub_neuron_configs if len(sub_neuron_configs) > 1 else sub_neuron_configs[0]
if data_parallel_mode is None:
data_parallel_mode = cls.set_default_dp_mode(configs)
pipe = cls.load_model(
data_parallel_mode=data_parallel_mode,
text_encoder_path=model_and_config_save_paths["text_encoder"][0],
unet_path=model_and_config_save_paths["unet"][0],
transformer_path=model_and_config_save_paths["transformer"][0],
vae_decoder_path=model_and_config_save_paths["vae_decoder"][0],
vae_encoder_path=model_and_config_save_paths["vae_encoder"][0],
text_encoder_2_path=model_and_config_save_paths["text_encoder_2"][0],
image_encoder_path=model_and_config_save_paths["image_encoder"][0],
controlnet_paths=model_and_config_save_paths["controlnet"][0],
dynamic_batch_size=neuron_configs[DIFFUSION_MODEL_TEXT_ENCODER_NAME].dynamic_batch_size,
to_neuron=not inline_weights_to_neff,
)
if model_save_dir is None:
model_save_dir = new_model_save_dir
return cls(
text_encoder=pipe.get("text_encoder"),
text_encoder_2=pipe.get("text_encoder_2"),
unet=pipe.get("unet"),
transformer=pipe.get("transformer"),
vae_encoder=pipe.get("vae_encoder"),
vae_decoder=pipe.get("vae_decoder"),
controlnet=pipe.get("controlnet"),
image_encoder=pipe.get("image_encoder"),
config=config,
tokenizer=sub_models.get("tokenizer", None),
tokenizer_2=sub_models.get("tokenizer_2", None),
scheduler=sub_models.get("scheduler"),
feature_extractor=sub_models.get("feature_extractor", None),
data_parallel_mode=data_parallel_mode,
configs=configs,
neuron_configs=neuron_configs,
model_save_dir=model_save_dir,
model_and_config_save_paths=model_and_config_save_paths,
)
@classmethod
@requires_torch_neuronx
def _from_transformers(cls, *args, **kwargs):
# Deprecate it when optimum uses `_export` as from_pretrained_method in a stable release.
return cls._export(*args, **kwargs)
@classmethod
@requires_torch_neuronx
def _export(
cls,
model_id: Union[str, Path],
config: Dict[str, Any],
torch_dtype: Optional[Union[str, torch.dtype]] = None,
unet_id: Optional[Union[str, Path]] = None,
token: Optional[Union[bool, str]] = None,
revision: str = "main",
force_download: bool = True,
cache_dir: Optional[str] = None,
compiler_workdir: Optional[str] = None,
tensor_parallel_size: Optional[int] = 1,
disable_neuron_cache: bool = False,
inline_weights_to_neff: bool = True,
optlevel: str = "2",
subfolder: str = "",
local_files_only: bool = False,
trust_remote_code: bool = False,
task: Optional[str] = None,
auto_cast: Optional[str] = "matmul",
auto_cast_type: Optional[str] = "bf16",
dynamic_batch_size: bool = False,
output_attentions: bool = False,
output_hidden_states: bool = False,
data_parallel_mode: Optional[Literal["none", "unet", "transformer", "all"]] = None,
controlnet_ids: Optional[Union[str, List[str]]] = None,
**kwargs,
) -> "NeuronDiffusionPipelineBase":
"""
Args:
model_id (`Union[str, Path]`):
Can be either:
- A string, the *model id* of a pretrained model hosted inside a model repo on huggingface.co.
Valid model ids can be located at the root-level, like `bert-base-uncased`, or namespaced under a
user or organization name, like `dbmdz/bert-base-german-cased`.
- A path to a *directory* containing a model saved using [`~OptimizedModel.save_pretrained`],
e.g., `./my_model_directory/`.
config (`Dict[str, Any]`):
A config dictionary from which the model components will be instantiated. Make sure to only load
configuration files of compatible classes.
torch_dtype (`Optional[Union[str, torch.dtype]]`, defaults to `None`):
Override the default `torch.dtype` and load the model under this dtype. If `auto` is passed, the dtype will be automatically derived from the model's weights.
unet_id (`Optional[Union[str, Path]]`, defaults to `None`):
A string or a path point to the U-NET model to replace the one in the original pipeline.
token (`Optional[Union[bool, str]]`, defaults to `None`):
The token to use as HTTP bearer authorization for remote files. If `True`, will use the token generated
when running `huggingface-cli login` (stored in `huggingface_hub.constants.HF_TOKEN_PATH`).
revision (`str`, defaults to `"main"`):
The specific model version to use (can be a branch name, tag name or commit id).
force_download (`bool`, defaults to `True`):
Whether or not to force the (re-)download of the model weights and configuration files, overriding the
cached versions if they exist.
cache_dir (`Optional[str]`, defaults to `None`):
Path to a directory in which a downloaded pretrained model configuration should be cached if the
standard cache should not be used.
compiler_workdir (`Optional[str]`, defaults to `None`):
Path to a directory in which the neuron compiler will store all intermediary files during the compilation(neff, weight, hlo graph...).
disable_neuron_cache (`bool`, defaults to `False`):
Whether to disable automatic caching of compiled models. If set to True, will not load neuron cache nor cache the compiled artifacts.
inline_weights_to_neff (`bool`, defaults to `True`):
Whether to inline the weights to the neff graph. If set to False, weights will be separated from the neff.
optlevel (`str`, defaults to `"2"`):
The level of optimization the compiler should perform. Can be `"1"`, `"2"` or `"3"`, defaults to "2".
1: enables the core performance optimizations in the compiler, while also minimizing compile time.
2: provides the best balance between model performance and compile time.
3: may provide additional model execution performance but may incur longer compile times and higher host memory usage during model compilation.
subfolder (`str`, defaults to `""`):
In case the relevant files are located inside a subfolder of the model repo either locally or on huggingface.co, you can
specify the folder name here.
local_files_only (`bool`, defaults to `False`):
Whether or not to only look at local files (i.e., do not try to download the model).
trust_remote_code (`bool`, defaults to `False`):
Whether or not to allow for custom code defined on the Hub in their own modeling. This option should only be set
to `True` for repositories you trust and in which you have read the code, as it will execute code present on
the Hub on your local machine.
task (`Optional[str]`, defaults to `None`):
The task to export the model for. If not specified, the task will be auto-inferred based on the model.
auto_cast (`Optional[str]`, defaults to `"matmul"`):
Whether to cast operations from FP32 to lower precision to speed up the inference. Can be `"none"`, `"matmul"` or `"all"`.
auto_cast_type (`Optional[str]`, defaults to `"bf16"`):
The data type to cast FP32 operations to when auto-cast mode is enabled. Can be `"bf16"`, `"fp16"` or `"tf32"`.
dynamic_batch_size (`bool`, defaults to `False`):
Whether to enable dynamic batch size for neuron compiled model. If this option is enabled, the input batch size can be a multiple of the
batch size during the compilation, but it comes with a potential tradeoff in terms of latency.
output_hidden_states (`bool`, defaults to `False`):
Whether or not for the traced text encoders to return the hidden states of all layers.
data_parallel_mode (`Optional[Literal["none", "unet", "transformer", "all"]]`, defaults to `None`):
Mode to decide what components to load into both NeuronCores of a Neuron device. Can be "none"(no data parallel), "unet"(only
load unet into both cores of each device), "all"(load the whole pipeline into both cores).
lora_model_ids (`Optional[Union[str, List[str]]]`, defaults to `None`):
Lora model local paths or repo ids (eg. `ostris/super-cereal-sdxl-lora`) on the Hugginface Hub.
lora_weight_names (`Optional[Union[str, List[str]]]`, defaults to `None`):
Lora weights file names.
lora_adapter_names (`Optional[Union[str, List[str]]]`, defaults to `None`):
Adapter names to be used for referencing the loaded adapter models.
lora_scales (`Optional[List[float]]`, defaults to `None`):
Lora adapters scaling factors.
controlnet_ids (`Optional[Union[str, List[str]]]`, defaults to `None`):
List of ControlNet model ids (eg. `thibaud/controlnet-openpose-sdxl-1.0`)."
ip_adapter_ids (`Optional[Union[str, List[str]]]`, defaults to `None`):
Model ids (eg. `h94/IP-Adapter`) of IP-Adapter models hosted on the Hub or paths to local directories containing the IP-Adapter weights.
ip_adapter_subfolders (`Optional[Union[str, List[str]]]`, defaults to `None`):
The subfolder location of a model file within a larger model repository on the Hub or locally. If a list is passed, it should have the same length as `ip_adapter_weight_names`.
ip_adapter_weight_names (`Optional[Union[str, List[str]]]`, defaults to `None`):
The name of the weight file to load. If a list is passed, it should have the same length as `ip_adapter_subfolders`.
ip_adapter_scales (`Optional[Union[float, List[float]]]`, defaults to `None`):
Scaling factors for the IP-Adapters.
"""
# Parse kwargs to their dataclass
parser = NeuronArgumentParser(**kwargs)
lora_args = parser.lora_args
ip_adapter_args = parser.ip_adapter_args
kwargs_shapes = asdict(parser.input_shapes)
if task is None:
if cls.task is not None:
task = cls.task
else:
task = TasksManager.infer_task_from_model(cls.auto_model_class)
# mandatory shapes
input_shapes = normalize_stable_diffusion_input_shapes(kwargs_shapes)
# Get compilation arguments
auto_cast_type = None if auto_cast is None else auto_cast_type
compiler_kwargs = {
"auto_cast": auto_cast,
"auto_cast_type": auto_cast_type,
}
pipe = TasksManager.get_model_from_task(
task=task,
model_name_or_path=model_id,
subfolder=subfolder,
revision=revision,
framework="pt",
torch_dtype=torch_dtype,
library_name=cls.library_name,
cache_dir=cache_dir,
token=token,
local_files_only=local_files_only,
force_download=force_download,
trust_remote_code=trust_remote_code,
)
submodels = {"unet": unet_id}
pipe = replace_stable_diffusion_submodels(pipe, submodels)
# Check if the cache exists
if not disable_neuron_cache:
save_dir = TemporaryDirectory()
save_dir_path = Path(save_dir.name)
# 1. Fetch all model configs
input_shapes_copy = copy.deepcopy(input_shapes)
models_and_neuron_configs, _ = load_models_and_neuron_configs(
model_name_or_path=model_id,
output=save_dir_path,
model=pipe,
task=task,
dynamic_batch_size=dynamic_batch_size,
cache_dir=cache_dir,
trust_remote_code=trust_remote_code,
subfolder=subfolder,
revision=revision,
library_name=cls.library_name,
force_download=force_download,
local_files_only=local_files_only,
token=token,
submodels=submodels,
lora_args=lora_args,
ip_adapter_args=ip_adapter_args,
output_hidden_states=output_hidden_states,
torch_dtype=torch_dtype,
controlnet_ids=controlnet_ids,
**input_shapes_copy,
)
# 2. Build compilation config
compilation_configs = {}
for name, (model, neuron_config) in models_and_neuron_configs.items():
if "vae" in name: # vae configs are not cached.
continue
model_config = model.config
if isinstance(model_config, FrozenDict):
model_config = OrderedDict(model_config)
model_config = DiffusersPretrainedConfig.from_dict(model_config)
compilation_config = store_compilation_config(
config=model_config,
input_shapes=neuron_config.input_shapes,
compiler_kwargs=compiler_kwargs,
input_names=neuron_config.inputs,
output_names=neuron_config.outputs,
dynamic_batch_size=neuron_config.dynamic_batch_size,
tensor_parallel_size=tensor_parallel_size,
compiler_type=NEURON_COMPILER_TYPE,
compiler_version=NEURON_COMPILER_VERSION,
inline_weights_to_neff=inline_weights_to_neff,
optlevel=optlevel,
model_type=getattr(neuron_config, "MODEL_TYPE", None),
task=getattr(neuron_config, "task", None),
output_attentions=output_attentions,
output_hidden_states=getattr(neuron_config, "output_hidden_states", False),
)
compilation_configs[name] = compilation_config
# 3. Lookup cached config
cache_entry = MultiModelCacheEntry(model_id=model_id, configs=compilation_configs)
compile_cache = create_hub_compile_cache_proxy()
model_cache_dir = compile_cache.default_cache.get_cache_dir_with_cache_key(f"MODULE_{cache_entry.hash}")
cache_exist = compile_cache.download_folder(model_cache_dir, model_cache_dir)
else:
cache_exist = False
if cache_exist:
# load cache
neuron_model = cls.from_pretrained(model_cache_dir, data_parallel_mode=data_parallel_mode)
# replace weights
if not inline_weights_to_neff:
neuron_model.replace_weights(weights=pipe)
return neuron_model
else:
# compile
save_dir = TemporaryDirectory()
save_dir_path = Path(save_dir.name)
main_export(
model_name_or_path=model_id,
output=save_dir_path,
compiler_kwargs=compiler_kwargs,
lora_args=lora_args,
ip_adapter_args=ip_adapter_args,
torch_dtype=torch_dtype,
task=task,
dynamic_batch_size=dynamic_batch_size,
cache_dir=cache_dir,
disable_neuron_cache=disable_neuron_cache,
compiler_workdir=compiler_workdir,
inline_weights_to_neff=inline_weights_to_neff,
optlevel=optlevel,
trust_remote_code=trust_remote_code,
subfolder=subfolder,
revision=revision,
force_download=force_download,
local_files_only=local_files_only,
token=token,
do_validation=False,
submodels={"unet": unet_id},
output_hidden_states=output_hidden_states,
controlnet_ids=controlnet_ids,
library_name=cls.library_name,
**input_shapes,
)
return cls._from_pretrained(
model_id=save_dir_path,
config=config,
model_save_dir=save_dir,
data_parallel_mode=data_parallel_mode,
)
@classmethod
def _load_config(cls, config_name_or_path: Union[str, os.PathLike], **kwargs):
return cls.load_config(config_name_or_path, **kwargs)
def _save_config(self, save_directory):
self.save_config(save_directory)
@property
def components(self) -> Dict[str, Any]:
components = {
"vae_encoder": self.vae_encoder,
"vae_decoder": self.vae_decoder,
"unet": self.unet,
"transformer": self.transformer,
"text_encoder": self.text_encoder,
"text_encoder_2": self.text_encoder_2,
"image_encoder": self.image_encoder,
"safety_checker": self.safety_checker,
"neuron_configs": self.neuron_configs,
"data_parallel_mode": self.data_parallel_mode,
"feature_extractor": self.feature_extractor,
"configs": self.configs,
"config": self.config,
"tokenizer": self.tokenizer,
"tokenizer_2": self.tokenizer_2,
"scheduler": self.scheduler,
}
return components
@property
def do_classifier_free_guidance(self):
return (
self._guidance_scale > 1
and self.unet.config.time_cond_proj_dim is None
and (
self.dynamic_batch_size
or self.data_parallel_mode == "unet"
or self.data_parallel_mode == "transformer"
)
)
def _maybe_create_dummy_image_proj_layers(self):
if all([self.image_encoder, self.encoder_hid_proj]):
self.unet.encoder_hid_proj = self.encoder_hid_proj
def __call__(self, *args, **kwargs):
# Height and width to unet/transformer (static shapes)
unet_or_transformer = self.unet or self.transformer
height = unet_or_transformer.config.neuron["static_height"] * self.vae_scale_factor
width = unet_or_transformer.config.neuron["static_width"] * self.vae_scale_factor
kwargs.pop("height", None)
kwargs.pop("width", None)
if kwargs.get("image", None):
kwargs["image"] = self.image_processor.preprocess(kwargs["image"], height=height, width=width)
# Override default `max_sequence_length`, eg. pixart
if "max_sequence_length" in inspect.signature(self.auto_model_class.__call__).parameters:
kwargs["max_sequence_length"] = self.text_encoder.config.neuron.get("static_sequence_length", None)
return self.auto_model_class.__call__(self, height=height, width=width, *args, **kwargs)
class _NeuronDiffusionModelPart:
"""
For multi-file Neuron models, represents a part / a model in the pipeline.
"""
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[Union[DiffusersPretrainedConfig, PretrainedConfig]] = None,
neuron_config: Optional["NeuronDefaultConfig"] = None,
model_type: str = "unet",
device: Optional[int] = None,
):
self.model = model
self.parent_pipeline = parent_pipeline
self.config = config
self.neuron_config = neuron_config
self.model_type = model_type
self.device = device
@abstractmethod
def forward(self, *args, **kwargs):
pass
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
@property
def dtype(self):
return None
def to(self, *args, **kwargs):
pass
class NeuronModelTextEncoder(_NeuronDiffusionModelPart):
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
super().__init__(model, parent_pipeline, config, neuron_config, DIFFUSION_MODEL_TEXT_ENCODER_NAME)
def forward(
self,
input_ids: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = True,
):
if output_hidden_states:
assert (
self.config.output_hidden_states or self.config.neuron.get("output_hidden_states")
) == output_hidden_states, (
"output_hidden_states is expected to be False since the model was compiled without hidden_states as output."
)
input_ids = input_ids.to(torch.long) # dummy generator uses long int for tracing
inputs = (input_ids,)
if attention_mask is not None and not torch.all(attention_mask == 1):
inputs += (attention_mask,)
outputs = self.model(*inputs)
if return_dict:
outputs = ModelOutput(dict(zip(self.neuron_config.outputs, outputs)))
return outputs
def modules(self):
# dummy func for passing `unscale_lora_layers`.
return []
class NeuronModelImageEncoder(_NeuronDiffusionModelPart):
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
super().__init__(model, parent_pipeline, config, neuron_config, DIFFUSION_MODEL_IMAGE_ENCODER_NAME)
def forward(
self,
pixel_values: torch.FloatTensor,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
interpolate_pos_encoding: bool = False,
return_dict: Optional[bool] = True,
):
inputs = (pixel_values,)
outputs = self.model(*inputs)
if return_dict:
outputs = ModelOutput(dict(zip(self.neuron_config.outputs, outputs)))
return outputs
# Create a dummy parameters to be compatible with `https://github.com/huggingface/diffusers/blob/c14057c8dbc32847bac9082bcc0ae00c9a19357d/src/diffusers/pipelines/stable_diffusion/pipeline_stable_diffusion.py#L514`
def parameters(self):
class DummyObject:
def __init__(self):
self.dtype = None
return iter([DummyObject()])
class NeuronModelUnet(_NeuronDiffusionModelPart):
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
super().__init__(model, parent_pipeline, config, neuron_config, DIFFUSION_MODEL_UNET_NAME)
if hasattr(self.model, "device"):
self.device = self.model.device
def forward(
self,
sample: torch.Tensor,
timestep: torch.Tensor,
encoder_hidden_states: torch.Tensor,
added_cond_kwargs: Optional[Dict[str, torch.Tensor]] = None,
down_block_additional_residuals: Optional[Tuple[torch.Tensor]] = None,
mid_block_additional_residual: Optional[torch.Tensor] = None,
timestep_cond: Optional[torch.Tensor] = None,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
return_dict: bool = True,
):
if cross_attention_kwargs is not None:
logger.warning("`cross_attention_kwargs` is not yet supported during the tracing and it will be ignored.")
timestep = timestep.float().expand((sample.shape[0],))
inputs = (sample, timestep, encoder_hidden_states)
if timestep_cond is not None:
inputs = inputs + (timestep_cond,)
if mid_block_additional_residual is not None:
inputs = inputs + (mid_block_additional_residual,)
if down_block_additional_residuals is not None:
for idx in range(len(down_block_additional_residuals)):
inputs = inputs + (down_block_additional_residuals[idx],)
if added_cond_kwargs:
optional_inputs_names = ["text_embeds", "time_ids", "image_embeds"]
for optional_input_name in optional_inputs_names:
optional_input = added_cond_kwargs.get(optional_input_name, None)
if isinstance(optional_input, List):
optional_input = (
torch.stack(optional_input, dim=0) if len(optional_input) > 1 else optional_input[0]
)
if optional_input is not None:
inputs = inputs + (optional_input,)
outputs = self.model(*inputs)
if return_dict:
outputs = ModelOutput(dict(zip(self.neuron_config.outputs, outputs)))
return outputs
class NeuronModelTransformer(_NeuronDiffusionModelPart):
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
super().__init__(model, parent_pipeline, config, neuron_config, DIFFUSION_MODEL_TRANSFORMER_NAME)
def forward(
self,
hidden_states: torch.Tensor,
encoder_hidden_states: Optional[torch.Tensor] = None,
timestep: Optional[torch.LongTensor] = None,
added_cond_kwargs: Dict[str, torch.Tensor] = None,
cross_attention_kwargs: Dict[str, Any] = None,
attention_mask: Optional[torch.Tensor] = None,
encoder_attention_mask: Optional[torch.Tensor] = None,
return_dict: bool = True,
):
inputs = (hidden_states, encoder_hidden_states, timestep, encoder_attention_mask)
outputs = self.model(*inputs)
if return_dict:
outputs = ModelOutput(dict(zip(self.neuron_config.outputs, outputs)))
return outputs
class NeuronModelVaeEncoder(_NeuronDiffusionModelPart):
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
super().__init__(model, parent_pipeline, config, neuron_config, DIFFUSION_MODEL_VAE_ENCODER_NAME)
def forward(self, sample: torch.Tensor, return_dict: bool = True):
inputs = (sample,)
outputs = self.model(*inputs)
if "latent_parameters" in outputs:
outputs["latent_dist"] = DiagonalGaussianDistribution(parameters=outputs.pop("latent_parameters"))
if not return_dict:
return tuple(output for output in outputs.values())
else:
return AutoencoderKLOutput(latent_dist=outputs["latent_dist"])
class NeuronModelVaeDecoder(_NeuronDiffusionModelPart):
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
super().__init__(model, parent_pipeline, config, neuron_config, DIFFUSION_MODEL_VAE_DECODER_NAME)
def forward(
self,
latent_sample: torch.Tensor,
image: Optional[torch.Tensor] = None,
mask: Optional[torch.Tensor] = None,
return_dict: bool = True,
generator=None,
):
inputs = (latent_sample,)
if image is not None:
inputs += (image,)
if mask is not None:
inputs += (mask,)
outputs = self.model(*inputs)
if not return_dict:
return tuple(output for output in outputs.values())
else:
return DecoderOutput(**outputs)
class NeuronModelVae(_NeuronDiffusionModelPart):
def __init__(
self,
encoder: Optional[NeuronModelVaeEncoder],
decoder: NeuronModelVaeDecoder,
):
self.encoder = encoder
self.decoder = decoder
@property
def config(self):
return self.decoder.config
def encode(self, *args, **kwargs):
return self.encoder(*args, **kwargs)
def decode(self, *args, **kwargs):
return self.decoder(*args, **kwargs)
class NeuronControlNetModel(_NeuronDiffusionModelPart):
auto_model_class = ControlNetModel
library_name = "diffusers"
base_model_prefix = "neuron_model"
config_name = "model_index.json"
sub_component_config_name = "config.json"
def __init__(
self,
model: torch.jit._script.ScriptModule,
parent_pipeline: NeuronDiffusionPipelineBase,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
super().__init__(model, parent_pipeline, config, neuron_config, DIFFUSION_MODEL_CONTROLNET_NAME)
def forward(
self,
sample: torch.Tensor,
timestep: Union[torch.Tensor, float, int],
encoder_hidden_states: torch.Tensor,
controlnet_cond: torch.Tensor,
conditioning_scale: float = 1.0,
guess_mode: bool = False,
added_cond_kwargs: Optional[Dict] = None,
return_dict: bool = True,
) -> Union["ControlNetOutput", Tuple[Tuple[torch.Tensor, ...], torch.Tensor]]:
timestep = timestep.expand((sample.shape[0],)).to(torch.long)
inputs = (sample, timestep, encoder_hidden_states, controlnet_cond, conditioning_scale)
if added_cond_kwargs:
text_embeds = added_cond_kwargs.pop("text_embeds", None)
time_ids = added_cond_kwargs.pop("time_ids", None)
inputs += (text_embeds, time_ids)
outputs = self.model(*inputs)
if guess_mode:
logger.info(
"Guess mode is not yet supported. File us an issue on: https://github.com/huggingface/optimum-neuron/issues."
)
if return_dict:
outputs = ControlNetOutput(dict(zip(self.neuron_config.outputs, outputs)))
return outputs
@property
def __class__(self):
return ControlNetModel
class NeuronMultiControlNetModel(_NeuronDiffusionModelPart):
auto_model_class = MultiControlNetModel
library_name = "diffusers"
base_model_prefix = "neuron_model"
config_name = "model_index.json"
sub_component_config_name = "config.json"
def __init__(
self,
models: List[torch.jit._script.ScriptModule],
parent_pipeline: NeuronTracedModel,
config: Optional[DiffusersPretrainedConfig] = None,
neuron_config: Optional[Dict[str, str]] = None,
):
self.nets = models
self.parent_pipeline = parent_pipeline
self.config = config
self.neuron_config = neuron_config
self.model_type = DIFFUSION_MODEL_CONTROLNET_NAME
self.device = None
def forward(
self,
sample: torch.Tensor,
timestep: Union[torch.Tensor, float, int],
encoder_hidden_states: torch.Tensor,
controlnet_cond: torch.Tensor,
conditioning_scale: List[float],
guess_mode: bool = False,
return_dict: bool = True,
) -> Union["ControlNetOutput", Tuple[Tuple[torch.Tensor, ...], torch.Tensor]]:
if guess_mode:
logger.info(
"Guess mode is not yet supported. File us an issue on: https://github.com/huggingface/optimum-neuron/issues."
)
for i, (image, scale, controlnet) in enumerate(zip(controlnet_cond, conditioning_scale, self.nets)):
inputs = (sample, timestep, encoder_hidden_states, image, scale)
down_samples, mid_sample = controlnet(*inputs)
# merge samples
if i == 0:
down_block_res_samples, mid_block_res_sample = down_samples, mid_sample
else:
down_block_res_samples = [
samples_prev + samples_curr
for samples_prev, samples_curr in zip(down_block_res_samples, down_samples)
]
mid_block_res_sample += mid_sample
if return_dict:
return ControlNetOutput(
down_block_res_samples=down_block_res_samples, mid_block_res_sample=mid_block_res_sample
)
return down_block_res_samples, mid_block_res_sample
@property
def __class__(self):
return MultiControlNetModel
class NeuronStableDiffusionPipeline(NeuronDiffusionPipelineBase, StableDiffusionPipeline):
main_input_name = "prompt"
auto_model_class = StableDiffusionPipeline
class DummyEncoderHidProj:
def __init__(self):
self.image_projection_layers = ModuleList(
[IPAdapterFullImageProjection()]
) # TODO: support multiple IP adapters
encoder_hid_proj = DummyEncoderHidProj()
class NeuronStableDiffusionImg2ImgPipeline(NeuronDiffusionPipelineBase, StableDiffusionImg2ImgPipeline):
main_input_name = "image"
auto_model_class = StableDiffusionImg2ImgPipeline
class NeuronStableDiffusionInpaintPipeline(NeuronDiffusionPipelineBase, StableDiffusionInpaintPipeline):
main_input_name = "prompt"
auto_model_class = StableDiffusionInpaintPipeline
class NeuronStableDiffusionInstructPix2PixPipeline(
NeuronDiffusionPipelineBase, StableDiffusionInstructPix2PixPipeline
):
main_input_name = "prompt"
task = "task-to-image"
auto_model_class = StableDiffusionInstructPix2PixPipeline
class NeuronLatentConsistencyModelPipeline(NeuronDiffusionPipelineBase, LatentConsistencyModelPipeline):
main_input_name = "prompt"
auto_model_class = LatentConsistencyModelPipeline
class NeuronStableDiffusionControlNetPipeline(
NeuronStableDiffusionControlNetPipelineMixin, NeuronDiffusionPipelineBase, StableDiffusionControlNetPipeline
):
main_input_name = "prompt"
auto_model_class = StableDiffusionControlNetPipeline
class NeuronPixArtAlphaPipeline(NeuronDiffusionPipelineBase, PixArtAlphaPipeline):
main_input_name = "prompt"
auto_model_class = PixArtAlphaPipeline
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.image_processor = PixArtImageProcessor(vae_scale_factor=self.vae_scale_factor)
class NeuronPixArtSigmaPipeline(NeuronDiffusionPipelineBase, PixArtSigmaPipeline):
main_input_name = "prompt"
auto_model_class = PixArtSigmaPipeline
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.image_processor = PixArtImageProcessor(vae_scale_factor=self.vae_scale_factor)
class NeuronStableDiffusionXLPipeline(
NeuronStableDiffusionXLPipelineMixin, NeuronDiffusionPipelineBase, StableDiffusionXLPipeline
):
main_input_name = "prompt"
auto_model_class = StableDiffusionXLPipeline
class DummyEncoderHidProj:
def __init__(self):
self.image_projection_layers = ModuleList([ImageProjection()]) # TODO: support multiple IP adapters
encoder_hid_proj = DummyEncoderHidProj()
class NeuronStableDiffusionXLImg2ImgPipeline(
NeuronStableDiffusionXLPipelineMixin, NeuronDiffusionPipelineBase, StableDiffusionXLImg2ImgPipeline
):
main_input_name = "prompt"
auto_model_class = StableDiffusionXLImg2ImgPipeline
class NeuronStableDiffusionXLInpaintPipeline(
NeuronStableDiffusionXLPipelineMixin, NeuronDiffusionPipelineBase, StableDiffusionXLInpaintPipeline
):
main_input_name = "image"
auto_model_class = StableDiffusionXLInpaintPipeline
class NeuronStableDiffusionXLControlNetPipeline(
NeuronStableDiffusionXLControlNetPipelineMixin, NeuronDiffusionPipelineBase, StableDiffusionXLControlNetPipeline
):
main_input_name = "prompt"
auto_model_class = StableDiffusionXLControlNetPipeline