optimum/intel/openvino/modeling_diffusion.py (1,484 lines of code) (raw):
# Copyright 2022 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import inspect
import logging
import os
import shutil
from abc import abstractmethod
from collections import OrderedDict
from copy import deepcopy
from pathlib import Path
from tempfile import gettempdir
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
import openvino
import torch
from diffusers import (
AutoPipelineForImage2Image,
AutoPipelineForInpainting,
AutoPipelineForText2Image,
DiffusionPipeline,
LatentConsistencyModelImg2ImgPipeline,
LatentConsistencyModelPipeline,
StableDiffusionImg2ImgPipeline,
StableDiffusionInpaintPipeline,
StableDiffusionPipeline,
StableDiffusionXLImg2ImgPipeline,
StableDiffusionXLInpaintPipeline,
StableDiffusionXLPipeline,
pipelines,
)
from diffusers.configuration_utils import ConfigMixin
from diffusers.schedulers import SchedulerMixin
from diffusers.schedulers.scheduling_utils import SCHEDULER_CONFIG_NAME
from diffusers.utils.constants import CONFIG_NAME
from huggingface_hub import snapshot_download
from huggingface_hub.constants import HUGGINGFACE_HUB_CACHE
from huggingface_hub.utils import validate_hf_hub_args
from openvino import Core
from openvino._offline_transformations import compress_model_transformation
from transformers import CLIPFeatureExtractor, CLIPTokenizer
from transformers.modeling_outputs import ModelOutput
from transformers.utils import http_user_agent
from optimum.utils import (
DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER,
DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER,
DIFFUSION_MODEL_UNET_SUBFOLDER,
DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER,
DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER,
)
from ...exporters.openvino import main_export
from ..utils.import_utils import is_diffusers_version, is_openvino_version, is_transformers_version
from .configuration import OVConfig, OVQuantizationMethod, OVWeightQuantizationConfig
from .loaders import OVTextualInversionLoaderMixin
from .modeling_base import OVBaseModel
from .utils import (
ONNX_WEIGHTS_NAME,
OV_TO_PT_TYPE,
OV_XML_FILE_NAME,
TemporaryDirectory,
_print_compiled_model_properties,
check_scale_available,
model_has_dynamic_inputs,
np_to_pt_generators,
)
if is_diffusers_version(">=", "0.25.0"):
from diffusers.models.autoencoders.vae import DiagonalGaussianDistribution
else:
from diffusers.models.vae import DiagonalGaussianDistribution
# Required EncoderDecoderCache object from transformers
if is_diffusers_version(">=", "0.32") and is_transformers_version(">=", "4.45"):
from diffusers import LTXPipeline
else:
LTXPipeline = object
if is_diffusers_version(">=", "0.29.0"):
from diffusers import StableDiffusion3Img2ImgPipeline, StableDiffusion3Pipeline
else:
StableDiffusion3Pipeline, StableDiffusion3Img2ImgPipeline = object, object
if is_diffusers_version(">=", "0.30.0"):
from diffusers import FluxPipeline, StableDiffusion3InpaintPipeline
else:
StableDiffusion3InpaintPipeline = object
FluxPipeline = object
if is_diffusers_version(">=", "0.31.0"):
from diffusers import FluxImg2ImgPipeline, FluxInpaintPipeline
else:
FluxImg2ImgPipeline = object
FluxInpaintPipeline = object
if is_diffusers_version(">=", "0.32.0"):
from diffusers import FluxFillPipeline, SanaPipeline
else:
FluxFillPipeline = object
SanaPipeline = object
if is_diffusers_version(">=", "0.33.0"):
from diffusers import SanaSprintPipeline
else:
SanaSprintPipeline = object
DIFFUSION_MODEL_TRANSFORMER_SUBFOLDER = "transformer"
DIFFUSION_MODEL_TEXT_ENCODER_3_SUBFOLDER = "text_encoder_3"
core = Core()
logger = logging.getLogger(__name__)
# TODO: support DiffusionPipeline.from_pipe()
# TODO: makes more sense to have a compositional OVMixin class
# TODO: instead of one bloated __init__, we should consider an __init__ per pipeline
class OVDiffusionPipeline(OVBaseModel, DiffusionPipeline):
auto_model_class = DiffusionPipeline
config_name = "model_index.json"
_library_name = "diffusers"
def __init__(
self,
scheduler: SchedulerMixin,
unet: Optional[openvino.Model] = None,
vae_decoder: Optional[openvino.Model] = None,
# optional pipeline models
vae_encoder: Optional[openvino.Model] = None,
text_encoder: Optional[openvino.Model] = None,
text_encoder_2: Optional[openvino.Model] = None,
text_encoder_3: Optional[openvino.Model] = None,
transformer: Optional[openvino.Model] = None,
# optional pipeline submodels
tokenizer: Optional[CLIPTokenizer] = None,
tokenizer_2: Optional[CLIPTokenizer] = None,
tokenizer_3: Optional[CLIPTokenizer] = None,
feature_extractor: Optional[CLIPFeatureExtractor] = None,
# stable diffusion xl specific arguments
force_zeros_for_empty_prompt: bool = True,
requires_aesthetics_score: bool = False,
add_watermarker: Optional[bool] = None,
# openvino specific arguments
device: str = "CPU",
compile: bool = True,
compile_only: bool = False,
dynamic_shapes: bool = True,
ov_config: Optional[Dict[str, str]] = None,
model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None,
quantization_config: Optional[Union[OVWeightQuantizationConfig, Dict]] = None,
**kwargs,
):
self._device = device.upper()
self.is_dynamic = dynamic_shapes
self._compile_only = compile_only
self.model_save_dir = model_save_dir
self.ov_config = {} if ov_config is None else {**ov_config}
self.preprocessors = kwargs.get("preprocessors", [])
if self._compile_only:
if not compile:
raise ValueError(
"`compile_only` mode does not support disabling compilation."
"Please provide `compile=True` if you want to use `compile_only=True` or set `compile_only=False`"
)
main_model = unet if unet is not None else transformer
if not isinstance(main_model, openvino.CompiledModel):
raise ValueError("`compile_only` expect that already compiled model will be provided")
model_is_dynamic = model_has_dynamic_inputs(main_model)
if dynamic_shapes ^ model_is_dynamic:
requested_shapes = "dynamic" if dynamic_shapes else "static"
compiled_shapes = "dynamic" if model_is_dynamic else "static"
raise ValueError(
f"Provided compiled model with {compiled_shapes} shapes but requested to use {requested_shapes}. "
f"Please set `compile_only=False` or `dynamic_shapes={model_is_dynamic}`"
)
self.unet = OVModelUnet(unet, self, DIFFUSION_MODEL_UNET_SUBFOLDER) if unet is not None else None
self.transformer = (
OVModelTransformer(transformer, self, DIFFUSION_MODEL_TRANSFORMER_SUBFOLDER)
if transformer is not None
else None
)
if unet is None and transformer is None:
raise ValueError("`unet` or `transformer` model should be provided for pipeline work")
self.vae_decoder = OVModelVaeDecoder(vae_decoder, self, DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER)
self.vae_encoder = (
OVModelVaeEncoder(vae_encoder, self, DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER)
if vae_encoder is not None
else None
)
self.text_encoder = (
OVModelTextEncoder(text_encoder, self, DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER)
if text_encoder is not None
else None
)
self.text_encoder_2 = (
OVModelTextEncoder(text_encoder_2, self, DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER)
if text_encoder_2 is not None
else None
)
self.text_encoder_3 = (
OVModelTextEncoder(text_encoder_3, self, DIFFUSION_MODEL_TEXT_ENCODER_3_SUBFOLDER)
if text_encoder_3 is not None
else None
)
# We wrap the VAE Decoder & Encoder in a single object to simulate diffusers API
self.vae = OVModelVae(decoder=self.vae_decoder, encoder=self.vae_encoder)
self.scheduler = scheduler
self.tokenizer = tokenizer
self.tokenizer_2 = tokenizer_2
self.tokenizer_3 = tokenizer_3
self.feature_extractor = feature_extractor
# we allow passing these as torch models for now
self.image_encoder = kwargs.pop("image_encoder", None) # TODO: maybe mplement OVModelImageEncoder
self.safety_checker = kwargs.pop("safety_checker", None) # TODO: maybe mplement OVModelSafetyChecker
all_pipeline_init_args = {
"vae": self.vae,
"unet": self.unet,
"transformer": self.transformer,
"text_encoder": self.text_encoder,
"text_encoder_2": self.text_encoder_2,
"text_encoder_3": self.text_encoder_3,
"safety_checker": self.safety_checker,
"image_encoder": self.image_encoder,
"scheduler": self.scheduler,
"tokenizer": self.tokenizer,
"tokenizer_2": self.tokenizer_2,
"tokenizer_3": self.tokenizer_3,
"feature_extractor": self.feature_extractor,
"requires_aesthetics_score": requires_aesthetics_score,
"force_zeros_for_empty_prompt": force_zeros_for_empty_prompt,
"add_watermarker": add_watermarker,
}
diffusers_pipeline_args = {}
for key in inspect.signature(self.auto_model_class).parameters.keys():
if key in all_pipeline_init_args:
diffusers_pipeline_args[key] = all_pipeline_init_args[key]
# inits diffusers pipeline specific attributes (registers modules and config)
self.auto_model_class.__init__(self, **diffusers_pipeline_args)
# we use auto_model_class.__init__ here because we can't call super().__init__
# as OptimizedModel already defines an __init__ which is the first in the MRO
self._openvino_config = None
if quantization_config:
self._openvino_config = OVConfig(quantization_config=quantization_config)
self._set_ov_config_parameters()
if self.is_dynamic and not self._compile_only:
self.reshape(batch_size=-1, height=-1, width=-1, num_images_per_prompt=-1)
if compile and not self._compile_only:
self.compile()
@property
def ov_submodels(self) -> Dict[str, openvino.Model]:
return {name: getattr(getattr(self, name), "model") for name in self._ov_submodel_names}
@property
def _ov_submodel_names(self) -> List[str]:
submodel_name_candidates = [
"unet",
"transformer",
"vae_decoder",
"vae_encoder",
"text_encoder",
"text_encoder_2",
"text_encoder_3",
]
submodel_names = [name for name in submodel_name_candidates if getattr(self, name) is not None]
return submodel_names
def _save_pretrained(self, save_directory: Union[str, Path]):
"""
Saves the model to the OpenVINO IR format so that it can be re-loaded using the
[`~optimum.intel.openvino.modeling.OVModel.from_pretrained`] class method.
Arguments:
save_directory (`str` or `Path`):
The directory where to save the model files
"""
if self._compile_only:
raise ValueError(
"`save_pretrained()` is not supported with `compile_only` mode, please initialize model without this option"
)
save_directory = Path(save_directory)
models_to_save_paths = {
(self.unet, save_directory / DIFFUSION_MODEL_UNET_SUBFOLDER),
(self.vae_decoder, save_directory / DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER),
(self.vae_encoder, save_directory / DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER),
(self.text_encoder, save_directory / DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER),
(self.text_encoder_2, save_directory / DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER),
(self.text_encoder_3, save_directory / DIFFUSION_MODEL_TEXT_ENCODER_3_SUBFOLDER),
(self.transformer, save_directory / DIFFUSION_MODEL_TRANSFORMER_SUBFOLDER),
}
for model, save_path in models_to_save_paths:
if model is not None:
dst_path = save_path / OV_XML_FILE_NAME
dst_path.parent.mkdir(parents=True, exist_ok=True)
openvino.save_model(model.model, dst_path, compress_to_fp16=False)
model_dir = (
self.model_save_dir
if not isinstance(self.model_save_dir, TemporaryDirectory)
else self.model_save_dir.name
)
config_path = Path(model_dir) / save_path.name / CONFIG_NAME
if config_path.is_file():
config_save_path = save_path / CONFIG_NAME
shutil.copyfile(config_path, config_save_path)
else:
if hasattr(model, "save_config"):
model.save_config(save_path)
elif hasattr(model, "config") and hasattr(model.config, "save_pretrained"):
model.config.save_pretrained(save_path)
self.scheduler.save_pretrained(save_directory / "scheduler")
if self.tokenizer is not None:
self.tokenizer.save_pretrained(save_directory / "tokenizer")
if self.tokenizer_2 is not None:
self.tokenizer_2.save_pretrained(save_directory / "tokenizer_2")
if self.tokenizer_3 is not None:
self.tokenizer_3.save_pretrained(save_directory / "tokenizer_3")
if self.feature_extractor is not None:
self.feature_extractor.save_pretrained(save_directory / "feature_extractor")
if getattr(self, "safety_checker", None) is not None:
self.safety_checker.save_pretrained(save_directory / "safety_checker")
self._save_openvino_config(save_directory)
def _save_config(self, save_directory):
"""
Saves a model configuration into a directory, so that it can be re-loaded using the
[`from_pretrained`] class method.
"""
model_dir = (
self.model_save_dir
if not isinstance(self.model_save_dir, TemporaryDirectory)
else self.model_save_dir.name
)
save_dir = Path(save_directory)
original_config = Path(model_dir) / self.config_name
if original_config.exists():
if not save_dir.exists():
save_dir.mkdir(parents=True)
shutil.copy(original_config, save_dir)
else:
self.config.save_pretrained(save_dir)
@classmethod
def _from_pretrained(
cls,
model_id: Union[str, Path],
config: Dict[str, Any],
token: Optional[Union[bool, str]] = None,
revision: Optional[str] = None,
force_download: bool = False,
local_files_only: bool = False,
cache_dir: str = HUGGINGFACE_HUB_CACHE,
unet_file_name: Optional[str] = None,
vae_decoder_file_name: Optional[str] = None,
vae_encoder_file_name: Optional[str] = None,
text_encoder_file_name: Optional[str] = None,
text_encoder_2_file_name: Optional[str] = None,
text_encoder_3_file_name: Optional[str] = None,
transformer_file_name: Optional[str] = None,
from_onnx: bool = False,
load_in_8bit: bool = False,
quantization_config: Union[OVWeightQuantizationConfig, Dict] = None,
model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None,
**kwargs,
):
# same as DiffusionPipeline.from_pretraoned, if called directly, it loads the class in the config
if cls.__name__ == "OVDiffusionPipeline":
class_name = config["_class_name"]
ov_pipeline_class = _get_ov_class(class_name)
else:
ov_pipeline_class = cls
default_file_name = ONNX_WEIGHTS_NAME if from_onnx else OV_XML_FILE_NAME
unet_file_name = unet_file_name or default_file_name
vae_encoder_file_name = vae_encoder_file_name or default_file_name
vae_decoder_file_name = vae_decoder_file_name or default_file_name
text_encoder_file_name = text_encoder_file_name or default_file_name
text_encoder_2_file_name = text_encoder_2_file_name or default_file_name
text_encoder_3_file_name = text_encoder_3_file_name or default_file_name
transformer_file_name = transformer_file_name or default_file_name
if not os.path.isdir(str(model_id)):
all_components = {key for key in config.keys() if not key.startswith("_")} | {"vae_encoder", "vae_decoder"}
allow_patterns = {os.path.join(component, "*") for component in all_components}
allow_patterns.update(
{
unet_file_name,
transformer_file_name,
vae_encoder_file_name,
vae_decoder_file_name,
text_encoder_file_name,
text_encoder_2_file_name,
text_encoder_3_file_name,
unet_file_name.replace(".xml", ".bin"),
transformer_file_name.replace(".xml", ".bin"),
vae_encoder_file_name.replace(".xml", ".bin"),
vae_decoder_file_name.replace(".xml", ".bin"),
text_encoder_file_name.replace(".xml", ".bin"),
text_encoder_2_file_name.replace(".xml", ".bin"),
text_encoder_3_file_name.replace(".xml", ".bin"),
SCHEDULER_CONFIG_NAME,
cls.config_name,
CONFIG_NAME,
}
)
ignore_patterns = ["*.msgpack", "*.safetensors", "*pytorch_model.bin"]
if not from_onnx:
ignore_patterns.extend(["*.onnx", "*.onnx_data"])
model_save_folder = snapshot_download(
model_id,
cache_dir=cache_dir,
force_download=force_download,
local_files_only=local_files_only,
revision=revision,
token=token,
user_agent=http_user_agent,
allow_patterns=allow_patterns,
ignore_patterns=ignore_patterns,
)
else:
model_save_folder = str(model_id)
model_save_path = Path(model_save_folder)
if model_save_dir is None:
model_save_dir = model_save_path
submodels = {
"scheduler": None,
"tokenizer": None,
"tokenizer_2": None,
"tokenizer_3": None,
"feature_extractor": None,
"safety_checker": None,
"image_encoder": None,
}
for name in submodels.keys():
if name in kwargs:
submodels[name] = kwargs.pop(name)
elif config.get(name, (None, None))[0] is not None:
module_name, module_class = config.get(name)
if hasattr(pipelines, module_name):
module = getattr(pipelines, module_name)
else:
module = importlib.import_module(module_name)
class_obj = getattr(module, module_class)
load_method = getattr(class_obj, "from_pretrained")
# Check if the module is in a subdirectory
if (model_save_path / name).is_dir():
submodels[name] = load_method(model_save_path / name)
# For backward compatibility with models exported using previous optimum version, where safety_checker saving was disabled
elif name == "safety_checker":
logger.warning(
"Pipeline config contains `safety_checker` subcomponent, while `safety_checker` is not available in model directory. "
"`safety_checker` will be disabled. If you want to enable it please set it explicitly to `from_pretrained` method "
"or reexport model with new optimum-intel version"
)
submodels[name] = None
else:
submodels[name] = load_method(model_save_path)
models = {
"unet": model_save_path / DIFFUSION_MODEL_UNET_SUBFOLDER / unet_file_name,
"transformer": model_save_path / DIFFUSION_MODEL_TRANSFORMER_SUBFOLDER / transformer_file_name,
"vae_decoder": model_save_path / DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER / vae_decoder_file_name,
"vae_encoder": model_save_path / DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER / vae_encoder_file_name,
"text_encoder": model_save_path / DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER / text_encoder_file_name,
"text_encoder_2": model_save_path / DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER / text_encoder_2_file_name,
"text_encoder_3": model_save_path / DIFFUSION_MODEL_TEXT_ENCODER_3_SUBFOLDER / text_encoder_3_file_name,
}
for config_key, value in config.items():
if config_key not in models and config_key not in kwargs and config_key not in submodels:
kwargs[config_key] = value
compile_only = kwargs.get("compile_only", False)
quantization_config = cls._prepare_quantization_config(quantization_config, load_in_8bit)
if (quantization_config is None or quantization_config.dataset is None) and not compile_only:
for name, path in models.items():
if name in kwargs:
models[name] = kwargs.pop(name)
else:
models[name] = cls.load_model(path, quantization_config) if path.is_file() else None
elif compile_only:
ov_config = kwargs.get("ov_config", {})
device = kwargs.get("device", "CPU")
vae_ov_conifg = {**ov_config}
if (
"GPU" in device.upper()
and "INFERENCE_PRECISION_HINT" not in vae_ov_conifg
and is_openvino_version("<=", "2025.0")
):
vae_model_path = models["vae_decoder"]
required_upcast = check_scale_available(vae_model_path)
if required_upcast:
vae_ov_conifg["INFERENCE_PRECISION_HINT"] = "f32"
for name, path in models.items():
if name in kwargs:
models[name] = kwargs.pop(name)
else:
models[name] = (
cls._compile_model(
path,
device,
ov_config if "vae" not in name else vae_ov_conifg,
Path(model_save_dir) / name,
)
if path.is_file()
else None
)
else:
# why is this quantization not performed in __init__?
if ov_pipeline_class.export_feature != "text-to-image":
raise NotImplementedError(f"Quantization is not supported for {cls.__name__}")
from optimum.intel import OVQuantizer
for name, path in models.items():
if name in kwargs:
models[name] = kwargs.pop(name)
else:
models[name] = cls.load_model(path) if path.is_file() else None
ov_pipeline = ov_pipeline_class(**models, **submodels, model_save_dir=model_save_dir, **kwargs)
# same as in DiffusionPipeline.from_pretrained, we save where the model was instantiated from
ov_pipeline.register_to_config(_name_or_path=config.get("_name_or_path", str(model_id)))
quantizer = OVQuantizer(ov_pipeline)
if isinstance(quantization_config, OVWeightQuantizationConfig):
hybrid_quantization_config = deepcopy(quantization_config)
hybrid_quantization_config.quant_method = OVQuantizationMethod.HYBRID
quantizer.quantize(ov_config=OVConfig(quantization_config=hybrid_quantization_config))
else:
quantizer.quantize(ov_config=OVConfig(quantization_config=quantization_config))
return ov_pipeline
ov_pipeline = ov_pipeline_class(
**models,
**submodels,
model_save_dir=model_save_dir,
quantization_config=quantization_config,
**kwargs,
)
# same as in DiffusionPipeline.from_pretrained, we save where the model was instantiated from
ov_pipeline.register_to_config(_name_or_path=config.get("_name_or_path", str(model_id)))
return ov_pipeline
@classmethod
def _export(
cls,
model_id: str,
config: Dict[str, Any],
token: Optional[Union[bool, str]] = None,
revision: Optional[str] = None,
force_download: bool = False,
cache_dir: str = HUGGINGFACE_HUB_CACHE,
local_files_only: bool = False,
load_in_8bit: Optional[bool] = None,
quantization_config: Union[OVWeightQuantizationConfig, Dict] = None,
compile_only: bool = False,
**kwargs,
):
if compile_only:
logger.warning(
"`compile_only` mode will be disabled because it does not support model export."
"Please provide openvino model obtained using optimum-cli or saved on disk using `save_pretrained`"
)
compile_only = False
# If load_in_8bit and quantization_config not specified then ov_config is set
# to None and will be set by default in convert depending on the model size
if load_in_8bit is None and not quantization_config:
ov_config = None
else:
ov_config = OVConfig(dtype="auto")
torch_dtype = kwargs.pop("torch_dtype", None)
model_loading_kwargs = {}
if torch_dtype is not None:
model_loading_kwargs["torch_dtype"] = torch_dtype
model_save_dir = TemporaryDirectory()
model_save_path = Path(model_save_dir.name)
variant = kwargs.pop("variant", None)
main_export(
model_name_or_path=model_id,
output=model_save_path,
do_validation=False,
no_post_process=True,
revision=revision,
cache_dir=cache_dir,
task=cls.export_feature,
token=token,
local_files_only=local_files_only,
force_download=force_download,
ov_config=ov_config,
library_name=cls._library_name,
variant=variant,
model_loading_kwargs=model_loading_kwargs,
)
return cls._from_pretrained(
model_id=model_save_path,
config=config,
from_onnx=False,
token=token,
revision=revision,
cache_dir=cache_dir,
force_download=force_download,
local_files_only=local_files_only,
model_save_dir=model_save_dir,
quantization_config=quantization_config,
load_in_8bit=load_in_8bit,
compile_only=compile_only,
**kwargs,
)
def to(self, *args, device: Optional[str] = None, dtype: Optional[torch.dtype] = None):
for arg in args:
if isinstance(arg, str):
device = arg
elif isinstance(arg, torch.dtype):
dtype = arg
if isinstance(device, str):
self._device = device.upper()
self.clear_requests()
elif device is not None:
raise ValueError(
"The `device` argument should be a string representing the device on which the model should be loaded."
)
if dtype is not None and dtype != self.dtype:
raise NotImplementedError(
f"Cannot change the dtype of the model from {self.dtype} to {dtype}. "
f"Please export the model with the desired dtype."
)
return self
@property
def height(self) -> int:
model = self.vae.decoder.model
height = model.inputs[0].get_partial_shape()[-2]
if height.is_dynamic:
return -1
return height.get_length() * (
self.vae_scale_factor if hasattr(self, "vae_scale_factor") else self.vae_spatial_compression_ratio
)
@property
def width(self) -> int:
model = self.vae.decoder.model
width = model.inputs[0].get_partial_shape()[-1]
if width.is_dynamic:
return -1
return width.get_length() * (
self.vae_scale_factor if hasattr(self, "vae_scale_factor") else self.vae_spatial_compression_ratio
)
@property
def batch_size(self) -> int:
model = self.unet.model if self.unet is not None else self.transformer.model
batch_size = model.inputs[0].get_partial_shape()[0]
if batch_size.is_dynamic:
return -1
return batch_size.get_length()
def _reshape_unet(
self,
model: openvino.Model,
batch_size: int = -1,
height: int = -1,
width: int = -1,
num_images_per_prompt: int = -1,
tokenizer_max_length: int = -1,
):
if batch_size == -1 or num_images_per_prompt == -1:
batch_size = -1
else:
batch_size *= num_images_per_prompt
# The factor of 2 comes from the guidance scale > 1
if "timestep_cond" not in {inputs.get_any_name() for inputs in model.inputs}:
batch_size *= 2
height = height // self.vae_scale_factor if height > 0 else height
width = width // self.vae_scale_factor if width > 0 else width
shapes = {}
for inputs in model.inputs:
shapes[inputs] = inputs.get_partial_shape()
if inputs.get_any_name() == "timestep":
if shapes[inputs].rank == 1:
shapes[inputs][0] = 1
elif inputs.get_any_name() == "sample":
in_channels = self.unet.config.get("in_channels", None)
if in_channels is None:
in_channels = shapes[inputs][1]
if in_channels.is_dynamic:
logger.warning(
"Could not identify `in_channels` from the unet configuration, to statically reshape the unet please provide a configuration."
)
self.is_dynamic = True
shapes[inputs] = [batch_size, in_channels, height, width]
elif inputs.get_any_name() == "text_embeds":
shapes[inputs] = [batch_size, self.text_encoder_2.config["projection_dim"]]
elif inputs.get_any_name() == "time_ids":
shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]]
elif inputs.get_any_name() == "timestep_cond":
shapes[inputs] = [batch_size, self.unet.config["time_cond_proj_dim"]]
else:
shapes[inputs][0] = batch_size
shapes[inputs][1] = tokenizer_max_length
model.reshape(shapes)
return model
def _reshape_transformer(
self,
model: openvino.Model,
batch_size: int = -1,
height: int = -1,
width: int = -1,
num_images_per_prompt: int = -1,
tokenizer_max_length: int = -1,
num_frames: int = -1,
):
if batch_size == -1 or num_images_per_prompt == -1:
batch_size = -1
else:
# The factor of 2 comes from the guidance scale > 1
batch_size *= num_images_per_prompt
if "img_ids" not in {inputs.get_any_name() for inputs in model.inputs}:
batch_size *= 2
is_ltx = self.__class__.__name__.startswith("OVLTX")
if is_ltx:
height = height // self.vae_spatial_compression_ratio if height > 0 else -1
width = width // self.vae_spatial_compression_ratio if width > 0 else -1
packed_height_width = width * height * num_frames if height > 0 and width > 0 and num_frames > 0 else -1
else:
height = height // self.vae_scale_factor if height > 0 else height
width = width // self.vae_scale_factor if width > 0 else width
packed_height = height // 2 if height > 0 else height
packed_width = width // 2 if width > 0 else width
packed_height_width = packed_width * packed_height if height > 0 and width > 0 else -1
shapes = {}
for inputs in model.inputs:
shapes[inputs] = inputs.get_partial_shape()
if inputs.get_any_name() in ["timestep", "guidance"]:
shapes[inputs][0] = batch_size
elif inputs.get_any_name() == "hidden_states":
in_channels = self.transformer.config.get("in_channels", None)
if in_channels is None:
in_channels = (
shapes[inputs][1] if inputs.get_partial_shape().rank.get_length() == 4 else shapes[inputs][2]
)
if in_channels.is_dynamic:
logger.warning(
"Could not identify `in_channels` from the unet configuration, to statically reshape the unet please provide a configuration."
)
self.is_dynamic = True
if inputs.get_partial_shape().rank.get_length() == 4:
shapes[inputs] = [batch_size, in_channels, height, width]
else:
shapes[inputs] = [batch_size, packed_height_width, in_channels]
elif inputs.get_any_name() == "pooled_projections":
shapes[inputs] = [batch_size, self.transformer.config["pooled_projection_dim"]]
elif inputs.get_any_name() == "img_ids":
shapes[inputs] = (
[batch_size, packed_height_width, 3]
if is_diffusers_version("<", "0.31.0")
else [packed_height_width, 3]
)
elif inputs.get_any_name() == "txt_ids":
shapes[inputs] = [batch_size, -1, 3] if is_diffusers_version("<", "0.31.0") else [-1, 3]
elif inputs.get_any_name() in ["height", "width", "num_frames", "rope_interpolation_scale"]:
shapes[inputs] = inputs.get_partial_shape()
else:
shapes[inputs][0] = batch_size
shapes[inputs][1] = -1 # text_encoder_3 may have vary input length
model.reshape(shapes)
return model
def _reshape_text_encoder(self, model: openvino.Model, batch_size: int = -1, tokenizer_max_length: int = -1):
if batch_size != -1:
shapes = {input_tensor: [batch_size, tokenizer_max_length] for input_tensor in model.inputs}
model.reshape(shapes)
return model
def _reshape_vae_encoder(
self,
model: openvino.Model,
batch_size: int = -1,
height: int = -1,
width: int = -1,
num_frames: int = -1,
):
in_channels = self.vae_encoder.config.get("in_channels", None)
if in_channels is None:
in_channels = model.inputs[0].get_partial_shape()[1]
if in_channels.is_dynamic:
logger.warning(
"Could not identify `in_channels` from the VAE encoder configuration, to statically reshape the VAE encoder please provide a configuration."
)
self.is_dynamic = True
shapes = {
model.inputs[0]: [batch_size, in_channels, height, width]
if model.inputs[0].get_partial_shape().rank.get_length() == 4
else [batch_size, in_channels, num_frames, height, width]
}
model.reshape(shapes)
return model
def _reshape_vae_decoder(
self,
model: openvino.Model,
height: int = -1,
width: int = -1,
num_images_per_prompt: int = -1,
num_frames: int = -1,
):
is_ltx = self.__class__.__name__.startswith("OVLTX")
if is_ltx:
height = height // self.vae_spatial_compression_ratio if height > 0 else -1
width = width // self.vae_spatial_compression_ratio if width > 0 else -1
else:
height = height // self.vae_scale_factor if height > -1 else height
width = width // self.vae_scale_factor if width > -1 else width
latent_channels = self.vae_decoder.config.get("latent_channels", None)
if latent_channels is None:
latent_channels = model.inputs[0].get_partial_shape()[1]
if latent_channels.is_dynamic:
logger.warning(
"Could not identify `latent_channels` from the VAE decoder configuration, to statically reshape the VAE decoder please provide a configuration."
)
self.is_dynamic = True
shapes = {
model.inputs[0]: [num_images_per_prompt, latent_channels, height, width]
if not is_ltx
else [num_images_per_prompt, latent_channels, num_frames, height, width]
}
model.reshape(shapes)
return model
def reshape(self, batch_size: int, height: int, width: int, num_images_per_prompt: int = -1, num_frames: int = -1):
if self._compile_only:
raise ValueError(
"`reshape()` is not supported with `compile_only` mode, please initialize model without this option"
)
self.is_dynamic = -1 in {batch_size, height, width, num_images_per_prompt}
if self.tokenizer is None and self.tokenizer_2 is None:
tokenizer_max_len = -1
else:
if self.tokenizer is not None and "Gemma" in self.tokenizer.__class__.__name__:
tokenizer_max_len = -1
else:
tokenizer_max_len = (
getattr(self.tokenizer, "model_max_length", -1)
if self.tokenizer is not None
else getattr(self.tokenizer_2, "model_max_length", -1)
)
if self.unet is not None:
self.unet.model = self._reshape_unet(
self.unet.model, batch_size, height, width, num_images_per_prompt, tokenizer_max_len
)
if self.transformer is not None:
self.transformer.model = self._reshape_transformer(
self.transformer.model,
batch_size,
height,
width,
num_images_per_prompt,
tokenizer_max_len,
num_frames=num_frames,
)
self.vae_decoder.model = self._reshape_vae_decoder(
self.vae_decoder.model, height, width, num_images_per_prompt, num_frames=num_frames
)
if self.vae_encoder is not None:
self.vae_encoder.model = self._reshape_vae_encoder(
self.vae_encoder.model, batch_size, height, width, num_frames=num_frames
)
if self.text_encoder is not None:
self.text_encoder.model = self._reshape_text_encoder(
# GemmaTokenizer uses inf as model_max_length, Text Encoder in LTX do not pad input to model_max_length
self.text_encoder.model,
batch_size,
(
getattr(self.tokenizer, "model_max_length", -1)
if "Gemma" not in self.tokenizer.__class__.__name__
and not self.__class__.__name__.startswith("OVLTX")
else -1
),
)
if self.text_encoder_2 is not None:
self.text_encoder_2.model = self._reshape_text_encoder(
self.text_encoder_2.model, batch_size, getattr(self.tokenizer_2, "model_max_length", -1)
)
if self.text_encoder_3 is not None:
self.text_encoder_3.model = self._reshape_text_encoder(self.text_encoder_3.model, batch_size, -1)
self.clear_requests()
return self
def half(self):
"""
Converts all the model weights to FP16 for more efficient inference on GPU.
"""
if self._compile_only:
raise ValueError(
"`half()` is not supported with `compile_only` mode, please initialize model without this option"
)
for submodel in self.ov_submodels.values():
compress_model_transformation(submodel)
self.clear_requests()
return self
def clear_requests(self):
if self._compile_only:
raise ValueError(
"`clear_requests()` is not supported with `compile_only` mode, please initialize model without this option"
)
for submodel_name in self._ov_submodel_names:
getattr(self, submodel_name).request = None
def compile(self):
for submodel_name in self._ov_submodel_names:
getattr(self, submodel_name)._compile()
@classmethod
def _load_config(cls, config_name_or_path: Union[str, os.PathLike], **kwargs):
return cls.load_config(config_name_or_path, **kwargs)
@property
def components(self) -> Dict[str, Any]:
components = {
"vae": self.vae,
"unet": self.unet,
"transformer": self.transformer,
"text_encoder": self.text_encoder,
"text_encoder_2": self.text_encoder_2,
"text_encoder_3": self.text_encoder_2,
"safety_checker": self.safety_checker,
"image_encoder": self.image_encoder,
}
components = {k: v for k, v in components.items() if v is not None}
return components
def __call__(self, *args, **kwargs):
# we do this to keep numpy random states support for now
# TODO: deprecate and add warnings when a random state is passed
args = list(args)
for i in range(len(args)):
args[i] = np_to_pt_generators(args[i], self.device)
for k, v in kwargs.items():
kwargs[k] = np_to_pt_generators(v, self.device)
height, width = None, None
height_idx, width_idx = None, None
shapes_overridden = False
sig = inspect.signature(self.auto_model_class.__call__)
sig_height_idx = list(sig.parameters).index("height") if "height" in sig.parameters else len(sig.parameters)
sig_width_idx = list(sig.parameters).index("width") if "width" in sig.parameters else len(sig.parameters)
if "height" in kwargs:
height = kwargs["height"]
elif len(args) > sig_height_idx:
height = args[sig_height_idx]
height_idx = sig_height_idx
if "width" in kwargs:
width = kwargs["width"]
elif len(args) > sig_width_idx:
width = args[sig_width_idx]
width_idx = sig_width_idx
if self.height != -1:
if height is not None and height != self.height:
logger.warning(f"Incompatible height argument provided {height}. Pipeline only support {self.height}.")
height = self.height
else:
height = self.height
if height_idx is not None:
args[height_idx] = height
else:
kwargs["height"] = height
shapes_overridden = True
if self.width != -1:
if width is not None and width != self.width:
logger.warning(f"Incompatible widtth argument provided {width}. Pipeline only support {self.width}.")
width = self.width
else:
width = self.width
if width_idx is not None:
args[width_idx] = width
else:
kwargs["width"] = width
shapes_overridden = True
# Sana generates images in specific resolution grid size and then resize to requested size by default, it may contradict with pipeline height / width
# Disable this behavior for static shape pipeline
if self.auto_model_class.__name__.startswith("Sana") and shapes_overridden:
sig_resolution_bining_idx = (
list(sig.parameters).index("use_resolution_binning")
if "use_resolution_binning" in sig.parameters
else len(sig.parameters)
)
if len(args) > sig_resolution_bining_idx:
args[sig_resolution_bining_idx] = False
else:
kwargs["use_resolution_binning"] = False
# we use auto_model_class.__call__ here because we can't call super().__call__
# as OptimizedModel already defines a __call__ which is the first in the MRO
return self.auto_model_class.__call__(self, *args, **kwargs)
class OVPipelinePart(ConfigMixin):
config_name: str = CONFIG_NAME
def __init__(
self,
model: openvino.Model,
parent_pipeline: OVDiffusionPipeline,
model_name: str = "",
):
self.model = model
self.model_name = model_name
self.parent_pipeline = parent_pipeline
self.request = None if not parent_pipeline._compile_only else self.model
self.ov_config = parent_pipeline.ov_config
if isinstance(parent_pipeline.model_save_dir, TemporaryDirectory):
self.model_save_dir = Path(parent_pipeline.model_save_dir.name) / self.model_name
else:
self.model_save_dir = Path(parent_pipeline.model_save_dir) / self.model_name
config_file_path = self.model_save_dir / self.config_name
if not config_file_path.is_file():
# config is mandatory for the model part to be used for inference
raise ValueError(f"Configuration file for {self.__class__.__name__} not found at {config_file_path}")
config_dict = self._dict_from_json_file(config_file_path)
self.register_to_config(**config_dict)
@property
def _device(self) -> str:
return self.parent_pipeline._device
@property
def device(self) -> torch.device:
return self.parent_pipeline.device
@property
def dtype(self) -> torch.dtype:
return OV_TO_PT_TYPE[self.ov_config.get("dtype", "f32")]
def _compile(self):
if self.request is None:
if (
"CACHE_DIR" not in self.ov_config.keys()
and not str(self.model_save_dir).startswith(gettempdir())
and "GPU" in self._device
):
self.ov_config["CACHE_DIR"] = os.path.join(self.model_save_dir, "model_cache")
logger.info(f"Compiling the {self.model_name} to {self._device} ...")
self.request = core.compile_model(self.model, self._device, self.ov_config)
# OPENVINO_LOG_LEVEL can be found in https://docs.openvino.ai/2023.2/openvino_docs_OV_UG_supported_plugins_AUTO_debugging.html
if "OPENVINO_LOG_LEVEL" in os.environ and int(os.environ["OPENVINO_LOG_LEVEL"]) > 2:
_print_compiled_model_properties(self.request)
def to(self, *args, device: Optional[str] = None, dtype: Optional[torch.dtype] = None):
for arg in args:
if isinstance(arg, str):
device = arg
elif isinstance(arg, torch.dtype):
dtype = arg
if isinstance(device, str):
self._device = device.upper()
self.request = None
elif device is not None:
raise ValueError(
"The `device` argument should be a string representing the device on which the model should be loaded."
)
if dtype is not None and dtype != self.dtype:
raise NotImplementedError(
f"Cannot change the dtype of the model from {self.dtype} to {dtype}. "
f"Please export the model with the desired dtype."
)
return self
@abstractmethod
def forward(self, *args, **kwargs):
raise NotImplementedError
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
def modules(self):
return []
class OVModelTextEncoder(OVPipelinePart):
def __init__(self, model: openvino.Model, parent_pipeline: OVDiffusionPipeline, model_name: str = ""):
super().__init__(model, parent_pipeline, model_name)
self.hidden_states_output_names = [
name for out in self.model.outputs for name in out.names if name.startswith("hidden_states")
]
self.input_names = [inp.get_any_name() for inp in self.model.inputs]
def forward(
self,
input_ids: Union[np.ndarray, torch.Tensor],
attention_mask: Optional[Union[np.ndarray, torch.Tensor]] = None,
output_hidden_states: Optional[bool] = None,
return_dict: bool = False,
):
self._compile()
model_inputs = {"input_ids": input_ids}
if "attention_mask" in self.input_names:
model_inputs["attention_mask"] = attention_mask
ov_outputs = self.request(model_inputs, share_inputs=True)
main_out = ov_outputs[0]
model_outputs = {}
model_outputs[self.model.outputs[0].get_any_name()] = torch.from_numpy(main_out)
if len(self.model.outputs) > 1 and "pooler_output" in self.model.outputs[1].get_any_name():
model_outputs["pooler_output"] = torch.from_numpy(ov_outputs[1])
if self.hidden_states_output_names and "last_hidden_state" not in model_outputs:
model_outputs["last_hidden_state"] = torch.from_numpy(ov_outputs[self.hidden_states_output_names[-1]])
if (
self.hidden_states_output_names
and output_hidden_states
or getattr(self.config, "output_hidden_states", False)
):
hidden_states = [torch.from_numpy(ov_outputs[out_name]) for out_name in self.hidden_states_output_names]
model_outputs["hidden_states"] = hidden_states
if return_dict:
return model_outputs
return ModelOutput(**model_outputs)
class OVModelUnet(OVPipelinePart):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not hasattr(self.config, "time_cond_proj_dim"):
logger.warning(
"The `time_cond_proj_dim` attribute is missing from the UNet configuration. "
"Please re-export the model with newer version of optimum and diffusers."
)
self.register_to_config(time_cond_proj_dim=None)
def forward(
self,
sample: Union[np.ndarray, torch.Tensor],
timestep: Union[np.ndarray, torch.Tensor],
encoder_hidden_states: Union[np.ndarray, torch.Tensor],
text_embeds: Optional[Union[np.ndarray, torch.Tensor]] = None,
time_ids: Optional[Union[np.ndarray, torch.Tensor]] = None,
timestep_cond: Optional[Union[np.ndarray, torch.Tensor]] = None,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
added_cond_kwargs: Optional[Dict[str, Any]] = None,
return_dict: bool = False,
):
self._compile()
model_inputs = {
"sample": sample,
"timestep": timestep,
"encoder_hidden_states": encoder_hidden_states,
}
if text_embeds is not None:
model_inputs["text_embeds"] = text_embeds
if time_ids is not None:
model_inputs["time_ids"] = time_ids
if timestep_cond is not None:
model_inputs["timestep_cond"] = timestep_cond
if cross_attention_kwargs is not None:
model_inputs.update(cross_attention_kwargs)
if added_cond_kwargs is not None:
model_inputs.update(added_cond_kwargs)
ov_outputs = self.request(model_inputs, share_inputs=True).to_dict()
model_outputs = {}
for key, value in ov_outputs.items():
model_outputs[next(iter(key.names))] = torch.from_numpy(value)
if return_dict:
return model_outputs
return ModelOutput(**model_outputs)
class OVModelTransformer(OVPipelinePart):
def forward(
self,
hidden_states: torch.FloatTensor,
encoder_hidden_states: torch.FloatTensor = None,
pooled_projections: torch.FloatTensor = None,
timestep: torch.LongTensor = None,
img_ids: torch.Tensor = None,
txt_ids: torch.Tensor = None,
guidance: torch.Tensor = None,
block_controlnet_hidden_states: List = None,
joint_attention_kwargs: Optional[Dict[str, Any]] = None,
encoder_attention_mask: torch.LongTensor = None,
num_frames: Optional[int] = None,
height: Optional[int] = None,
width: Optional[int] = None,
rope_interpolation_scale: Optional[Union[Tuple[float, float, float], torch.Tensor]] = None,
video_coords: Optional[torch.Tensor] = None,
attention_kwargs: Optional[Dict[str, Any]] = None,
return_dict: bool = True,
):
self._compile()
model_inputs = {
"hidden_states": hidden_states,
"timestep": timestep,
"encoder_hidden_states": encoder_hidden_states,
}
if pooled_projections is not None:
model_inputs["pooled_projections"] = pooled_projections
if img_ids is not None:
model_inputs["img_ids"] = img_ids
if txt_ids is not None:
model_inputs["txt_ids"] = txt_ids
if guidance is not None:
model_inputs["guidance"] = guidance
if encoder_attention_mask is not None:
model_inputs["encoder_attention_mask"] = encoder_attention_mask
if num_frames is not None:
model_inputs["num_frames"] = num_frames
if height is not None:
model_inputs["height"] = height
if width is not None:
model_inputs["width"] = width
if rope_interpolation_scale is not None:
if not isinstance(rope_interpolation_scale, torch.Tensor):
rope_interpolation_scale = torch.tensor(rope_interpolation_scale)
model_inputs["rope_interpolation_scale"] = rope_interpolation_scale
ov_outputs = self.request(model_inputs, share_inputs=True).to_dict()
model_outputs = {}
for key, value in ov_outputs.items():
model_outputs[next(iter(key.names))] = torch.from_numpy(value)
if return_dict:
return model_outputs
return ModelOutput(**model_outputs)
class OVModelVaeEncoder(OVPipelinePart):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not hasattr(self.config, "scaling_factor"):
logger.warning(
"The `scaling_factor` attribute is missing from the VAE encoder configuration. "
"Please re-export the model with newer version of optimum and diffusers."
)
self.register_to_config(scaling_factor=2 ** (len(self.config.block_out_channels) - 1))
def forward(
self,
sample: Union[np.ndarray, torch.Tensor],
generator: Optional[torch.Generator] = None,
return_dict: bool = False,
):
self._compile()
model_inputs = {"sample": sample}
ov_outputs = self.request(model_inputs, share_inputs=True).to_dict()
model_outputs = {}
for key, value in ov_outputs.items():
model_outputs[next(iter(key.names))] = torch.from_numpy(value)
if "latent_sample" in model_outputs:
model_outputs["latents"] = model_outputs.pop("latent_sample")
if "latent_parameters" in model_outputs:
model_outputs["latent_dist"] = DiagonalGaussianDistribution(
parameters=model_outputs.pop("latent_parameters")
)
if return_dict:
return model_outputs
return ModelOutput(**model_outputs)
def _compile(self):
if (
"GPU" in self._device
and "INFERENCE_PRECISION_HINT" not in self.ov_config
and is_openvino_version("<", "2025.0")
and check_scale_available(self.model)
):
self.ov_config.update({"INFERENCE_PRECISION_HINT": "f32"})
super()._compile()
class OVModelVaeDecoder(OVPipelinePart):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# can be missing from models exported long ago
if not hasattr(self.config, "scaling_factor"):
logger.warning(
"The `scaling_factor` attribute is missing from the VAE decoder configuration. "
"Please re-export the model with newer version of optimum and diffusers."
)
self.register_to_config(scaling_factor=2 ** (len(self.config.block_out_channels) - 1))
def forward(
self,
latent_sample: Union[np.ndarray, torch.Tensor],
timestep: Optional[Union[np.ndarray, torch.Tensor]] = None,
generator: Optional[torch.Generator] = None,
return_dict: bool = False,
):
self._compile()
model_inputs = {"latent_sample": latent_sample}
if timestep is not None:
model_inputs["timestep"] = timestep
ov_outputs = self.request(model_inputs, share_inputs=True).to_dict()
model_outputs = {}
for key, value in ov_outputs.items():
model_outputs[next(iter(key.names))] = torch.from_numpy(value)
if return_dict:
return model_outputs
return ModelOutput(**model_outputs)
def _compile(self):
if (
"GPU" in self._device
and "INFERENCE_PRECISION_HINT" not in self.ov_config
and is_openvino_version("<", "2025.0")
and check_scale_available(self.model)
):
self.ov_config.update({"INFERENCE_PRECISION_HINT": "f32"})
super()._compile()
class OVModelVae:
def __init__(self, decoder: OVModelVaeDecoder, encoder: OVModelVaeEncoder):
self.decoder = decoder
self.encoder = encoder
self.spatial_compression_ratio, self.temporal_compression_ratio = None, None
if hasattr(self.decoder.config, "spatio_temporal_scaling"):
patch_size = self.decoder.config.patch_size
patch_size_t = self.decoder.config.patch_size_t
spatio_temporal_scaling = self.decoder.config.spatio_temporal_scaling
self.spatial_compression_ratio = patch_size * 2 ** sum(spatio_temporal_scaling)
self.temporal_compression_ratio = patch_size_t * 2 ** sum(spatio_temporal_scaling)
self.latents_mean, self.latents_std = None, None
if hasattr(self.decoder.config, "latents_mean_data"):
self.latents_mean = torch.tensor(self.decoder.config.latents_mean_data)
if hasattr(self.decoder.config, "latents_std_data"):
self.latents_std = torch.tensor(self.decoder.config.latents_std_data)
@property
def config(self):
return self.decoder.config
@property
def dtype(self):
return self.decoder.dtype
@property
def device(self):
return self.decoder.device
def decode(self, *args, **kwargs):
return self.decoder(*args, **kwargs)
def encode(self, *args, **kwargs):
return self.encoder(*args, **kwargs)
def to(self, *args, **kwargs):
self.decoder.to(*args, **kwargs)
if self.encoder is not None:
self.encoder.to(*args, **kwargs)
class OVStableDiffusionPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusionPipeline):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.StableDiffusionPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/stable_diffusion#diffusers.StableDiffusionPipeline).
"""
main_input_name = "prompt"
export_feature = "text-to-image"
auto_model_class = StableDiffusionPipeline
class OVStableDiffusionImg2ImgPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusionImg2ImgPipeline
):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.StableDiffusionImg2ImgPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/stable_diffusion_img2img#diffusers.StableDiffusionImg2ImgPipeline).
"""
main_input_name = "image"
export_feature = "image-to-image"
auto_model_class = StableDiffusionImg2ImgPipeline
class OVStableDiffusionInpaintPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusionInpaintPipeline
):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.StableDiffusionInpaintPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/stable_diffusion_inpaint#diffusers.StableDiffusionInpaintPipeline).
"""
main_input_name = "image"
export_feature = "inpainting"
auto_model_class = StableDiffusionInpaintPipeline
class OVStableDiffusionXLPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusionXLPipeline):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.StableDiffusionXLPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/stable_diffusion_xl#diffusers.StableDiffusionXLPipeline).
"""
main_input_name = "prompt"
export_feature = "text-to-image"
auto_model_class = StableDiffusionXLPipeline
def _get_add_time_ids(
self,
original_size,
crops_coords_top_left,
target_size,
dtype,
text_encoder_projection_dim=None,
):
add_time_ids = list(original_size + crops_coords_top_left + target_size)
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
return add_time_ids
class OVStableDiffusionXLImg2ImgPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusionXLImg2ImgPipeline
):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.StableDiffusionXLImg2ImgPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/stable_diffusion_xl#diffusers.StableDiffusionXLImg2ImgPipeline).
"""
main_input_name = "image"
export_feature = "image-to-image"
auto_model_class = StableDiffusionXLImg2ImgPipeline
def _get_add_time_ids(
self,
original_size,
crops_coords_top_left,
target_size,
aesthetic_score,
negative_aesthetic_score,
negative_original_size,
negative_crops_coords_top_left,
negative_target_size,
dtype,
text_encoder_projection_dim=None,
):
if self.config.requires_aesthetics_score:
add_time_ids = list(original_size + crops_coords_top_left + (aesthetic_score,))
add_neg_time_ids = list(
negative_original_size + negative_crops_coords_top_left + (negative_aesthetic_score,)
)
else:
add_time_ids = list(original_size + crops_coords_top_left + target_size)
add_neg_time_ids = list(negative_original_size + crops_coords_top_left + negative_target_size)
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
add_neg_time_ids = torch.tensor([add_neg_time_ids], dtype=dtype)
return add_time_ids, add_neg_time_ids
class OVStableDiffusionXLInpaintPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusionXLInpaintPipeline
):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.StableDiffusionXLInpaintPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/stable_diffusion_xl#diffusers.StableDiffusionXLInpaintPipeline).
"""
main_input_name = "image"
export_feature = "inpainting"
auto_model_class = StableDiffusionXLInpaintPipeline
def _get_add_time_ids(
self,
original_size,
crops_coords_top_left,
target_size,
aesthetic_score,
negative_aesthetic_score,
negative_original_size,
negative_crops_coords_top_left,
negative_target_size,
dtype,
text_encoder_projection_dim=None,
):
if self.config.requires_aesthetics_score:
add_time_ids = list(original_size + crops_coords_top_left + (aesthetic_score,))
add_neg_time_ids = list(
negative_original_size + negative_crops_coords_top_left + (negative_aesthetic_score,)
)
else:
add_time_ids = list(original_size + crops_coords_top_left + target_size)
add_neg_time_ids = list(negative_original_size + crops_coords_top_left + negative_target_size)
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
add_neg_time_ids = torch.tensor([add_neg_time_ids], dtype=dtype)
return add_time_ids, add_neg_time_ids
class OVLatentConsistencyModelPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, LatentConsistencyModelPipeline
):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.LatentConsistencyModelPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/latent_consistency#diffusers.LatentConsistencyModelPipeline).
"""
main_input_name = "prompt"
export_feature = "text-to-image"
auto_model_class = LatentConsistencyModelPipeline
class OVLatentConsistencyModelImg2ImgPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, LatentConsistencyModelImg2ImgPipeline
):
"""
OpenVINO-powered stable diffusion pipeline corresponding to [diffusers.LatentConsistencyModelImg2ImgPipeline](https://huggingface.co/docs/diffusers/api/pipelines/stable_diffusion/latent_consistency_img2img#diffusers.LatentConsistencyModelImg2ImgPipeline).
"""
main_input_name = "image"
export_feature = "image-to-image"
auto_model_class = LatentConsistencyModelImg2ImgPipeline
class OVStableDiffusion3Pipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusion3Pipeline):
main_input_name = "prompt"
export_feature = "text-to-image"
auto_model_class = StableDiffusion3Pipeline
class OVStableDiffusion3Img2ImgPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusion3Img2ImgPipeline
):
main_input_name = "image"
export_feature = "image-to-image"
auto_model_class = StableDiffusion3Img2ImgPipeline
class OVStableDiffusion3InpaintPipeline(
OVDiffusionPipeline, OVTextualInversionLoaderMixin, StableDiffusion3InpaintPipeline
):
main_input_name = "image"
export_feature = "inpainting"
auto_model_class = StableDiffusion3InpaintPipeline
class OVFluxPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, FluxPipeline):
main_input_name = "prompt"
export_feature = "text-to-image"
auto_model_class = FluxPipeline
class OVFluxImg2ImgPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, FluxImg2ImgPipeline):
main_input_name = "image"
export_feature = "image-to-image"
auto_model_class = FluxImg2ImgPipeline
class OVFluxInpaintPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, FluxInpaintPipeline):
main_input_name = "image"
export_feature = "inpainting"
auto_model_class = FluxInpaintPipeline
class OVFluxFillPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, FluxFillPipeline):
main_input_name = "image"
export_feature = "inpainting"
auto_model_class = FluxFillPipeline
class OVSanaPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, SanaPipeline):
main_input_name = "prompt"
export_feature = "text-to-image"
auto_model_class = SanaPipeline
class OVSanaSprintPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, SanaSprintPipeline):
main_input_name = "prompt"
export_feature = "text-to-image"
auto_model_class = SanaSprintPipeline
class OVLTXPipeline(OVDiffusionPipeline, OVTextualInversionLoaderMixin, LTXPipeline):
main_input_name = "prompt"
export_feature = "text-to-video"
auto_model_class = LTXPipeline
SUPPORTED_OV_PIPELINES = [
OVStableDiffusionPipeline,
OVStableDiffusionImg2ImgPipeline,
OVStableDiffusionInpaintPipeline,
OVStableDiffusionXLPipeline,
OVStableDiffusionXLImg2ImgPipeline,
OVStableDiffusionXLInpaintPipeline,
OVLatentConsistencyModelPipeline,
OVLatentConsistencyModelImg2ImgPipeline,
]
def _get_ov_class(pipeline_class_name: str, throw_error_if_not_exist: bool = True):
for ov_pipeline_class in SUPPORTED_OV_PIPELINES:
if (
ov_pipeline_class.__name__ == pipeline_class_name
or ov_pipeline_class.auto_model_class.__name__ == pipeline_class_name
):
return ov_pipeline_class
if throw_error_if_not_exist:
raise ValueError(f"OVDiffusionPipeline can't find a pipeline linked to {pipeline_class_name}")
OV_TEXT2IMAGE_PIPELINES_MAPPING = OrderedDict(
[
("stable-diffusion", OVStableDiffusionPipeline),
("stable-diffusion-xl", OVStableDiffusionXLPipeline),
("latent-consistency", OVLatentConsistencyModelPipeline),
]
)
OV_IMAGE2IMAGE_PIPELINES_MAPPING = OrderedDict(
[
("stable-diffusion", OVStableDiffusionImg2ImgPipeline),
("stable-diffusion-xl", OVStableDiffusionXLImg2ImgPipeline),
("latent-consistency", OVLatentConsistencyModelImg2ImgPipeline),
]
)
OV_INPAINT_PIPELINES_MAPPING = OrderedDict(
[
("stable-diffusion", OVStableDiffusionInpaintPipeline),
("stable-diffusion-xl", OVStableDiffusionXLInpaintPipeline),
]
)
OV_TEXT2VIDEO_PIPELINES_MAPPING = OrderedDict()
if is_diffusers_version(">=", "0.32") and is_transformers_version(">=", "4.45.0"):
OV_TEXT2VIDEO_PIPELINES_MAPPING["ltx-video"] = OVLTXPipeline
SUPPORTED_OV_PIPELINES.append(OVLTXPipeline)
if is_diffusers_version(">=", "0.29.0"):
SUPPORTED_OV_PIPELINES.extend(
[
OVStableDiffusion3Pipeline,
OVStableDiffusion3Img2ImgPipeline,
]
)
OV_TEXT2IMAGE_PIPELINES_MAPPING["stable-diffusion-3"] = OVStableDiffusion3Pipeline
OV_IMAGE2IMAGE_PIPELINES_MAPPING["stable-diffusion-3"] = OVStableDiffusion3Img2ImgPipeline
if is_diffusers_version(">=", "0.30.0"):
SUPPORTED_OV_PIPELINES.extend([OVStableDiffusion3InpaintPipeline, OVFluxPipeline])
OV_INPAINT_PIPELINES_MAPPING["stable-diffusion-3"] = OVStableDiffusion3InpaintPipeline
OV_TEXT2IMAGE_PIPELINES_MAPPING["flux"] = OVFluxPipeline
if is_diffusers_version(">=", "0.31.0"):
SUPPORTED_OV_PIPELINES.extend([OVFluxImg2ImgPipeline, OVFluxInpaintPipeline])
OV_INPAINT_PIPELINES_MAPPING["flux"] = OVFluxInpaintPipeline
OV_IMAGE2IMAGE_PIPELINES_MAPPING["flux"] = OVFluxImg2ImgPipeline
if is_diffusers_version(">=", "0.32.0"):
OV_INPAINT_PIPELINES_MAPPING["flux-fill"] = OVFluxFillPipeline
SUPPORTED_OV_PIPELINES.append(OVFluxFillPipeline)
OV_TEXT2IMAGE_PIPELINES_MAPPING["sana"] = OVSanaPipeline
SUPPORTED_OV_PIPELINES.append(OVSanaPipeline)
if is_diffusers_version(">=", "0.33.0"):
SUPPORTED_OV_PIPELINES.append(OVSanaSprintPipeline)
OV_TEXT2IMAGE_PIPELINES_MAPPING["sana-sprint"] = OVSanaSprintPipeline
SUPPORTED_OV_PIPELINES_MAPPINGS = [
OV_TEXT2IMAGE_PIPELINES_MAPPING,
OV_IMAGE2IMAGE_PIPELINES_MAPPING,
OV_INPAINT_PIPELINES_MAPPING,
OV_TEXT2VIDEO_PIPELINES_MAPPING,
]
def _get_task_ov_class(mapping, pipeline_class_name):
def _get_model_name(pipeline_class_name):
for ov_pipelines_mapping in SUPPORTED_OV_PIPELINES_MAPPINGS:
for model_name, ov_pipeline_class in ov_pipelines_mapping.items():
if (
ov_pipeline_class.__name__ == pipeline_class_name
or ov_pipeline_class.auto_model_class.__name__ == pipeline_class_name
):
return model_name
model_name = _get_model_name(pipeline_class_name)
if model_name is not None:
task_class = mapping.get(model_name, None)
if task_class is not None:
return task_class
raise ValueError(f"OVPipelineForTask can't find a pipeline linked to {pipeline_class_name} for {model_name}")
class OVPipelineForTask(ConfigMixin):
auto_model_class = DiffusionPipeline
config_name = "model_index.json"
@classmethod
@validate_hf_hub_args
def from_pretrained(cls, pretrained_model_or_path, **kwargs):
load_config_kwargs = {
"force_download": kwargs.get("force_download", False),
"resume_download": kwargs.get("resume_download", None),
"local_files_only": kwargs.get("local_files_only", False),
"cache_dir": kwargs.get("cache_dir", None),
"revision": kwargs.get("revision", None),
"proxies": kwargs.get("proxies", None),
"token": kwargs.get("token", None),
}
config = cls.load_config(pretrained_model_or_path, **load_config_kwargs)
config = config[0] if isinstance(config, tuple) else config
class_name = config["_class_name"]
ov_pipeline_class = _get_task_ov_class(cls.ov_pipelines_mapping, class_name)
return ov_pipeline_class.from_pretrained(pretrained_model_or_path, **kwargs)
class OVPipelineForText2Image(OVPipelineForTask):
auto_model_class = AutoPipelineForText2Image
ov_pipelines_mapping = OV_TEXT2IMAGE_PIPELINES_MAPPING
export_feature = "text-to-image"
class OVPipelineForImage2Image(OVPipelineForTask):
auto_model_class = AutoPipelineForImage2Image
ov_pipelines_mapping = OV_IMAGE2IMAGE_PIPELINES_MAPPING
export_feature = "image-to-image"
class OVPipelineForInpainting(OVPipelineForTask):
auto_model_class = AutoPipelineForInpainting
ov_pipelines_mapping = OV_INPAINT_PIPELINES_MAPPING
export_feature = "inpainting"
class OVPipelineForText2Video(OVPipelineForTask):
auto_model_class = DiffusionPipeline
ov_pipelines_mapping = OV_TEXT2VIDEO_PIPELINES_MAPPING
export_feature = "text-to-video"