docker_images/diffusers/app/pipelines/image_to_image.py (217 lines of code) (raw):

import json import logging import os import torch from app import idle, offline, timing, validation from app.pipelines import Pipeline from diffusers import ( AltDiffusionImg2ImgPipeline, AltDiffusionPipeline, AutoPipelineForImage2Image, ControlNetModel, DiffusionPipeline, DPMSolverMultistepScheduler, KandinskyImg2ImgPipeline, KandinskyPriorPipeline, StableDiffusionControlNetPipeline, StableDiffusionDepth2ImgPipeline, StableDiffusionImageVariationPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInstructPix2PixPipeline, StableDiffusionLatentUpscalePipeline, StableDiffusionPipeline, StableDiffusionUpscalePipeline, StableDiffusionXLImg2ImgPipeline, StableUnCLIPImg2ImgPipeline, StableUnCLIPPipeline, ) from PIL import Image logger = logging.getLogger(__name__) class ImageToImagePipeline(Pipeline, offline.OfflineBestEffortMixin): def __init__(self, model_id: str): use_auth_token = os.getenv("HF_API_TOKEN") self.use_auth_token = use_auth_token # This should allow us to make the image work with private models when no token is provided, if the said model # is already in local cache self.offline_preferred = validation.str_to_bool(os.getenv("OFFLINE_PREFERRED")) model_data = self._hub_model_info(model_id) kwargs = ( {"safety_checker": None} if model_id.startswith("hf-internal-testing/") else {} ) env_dtype = os.getenv("TORCH_DTYPE") if env_dtype: kwargs["torch_dtype"] = getattr(torch, env_dtype) elif torch.cuda.is_available(): kwargs["torch_dtype"] = torch.float16 if model_id == "stabilityai/stable-diffusion-xl-refiner-1.0": kwargs["variant"] = "fp16" # check if is controlnet or SD/AD config_file_name = None for file_name in ("config.json", "model_index.json"): if any(file.rfilename == file_name for file in model_data.siblings): config_file_name = file_name break if config_file_name: config_file = self._hub_repo_file(model_id, config_file_name) with open(config_file, "r") as f: config_dict = json.load(f) model_type = config_dict.get("_class_name", None) else: raise ValueError("Model type not found") # load according to model type if model_type == "ControlNetModel": model_to_load = ( model_data.cardData["base_model"] if "base_model" in model_data.cardData else "runwayml/stable-diffusion-v1-5" ) controlnet = ControlNetModel.from_pretrained( model_id, use_auth_token=use_auth_token, **kwargs ) self.ldm = StableDiffusionControlNetPipeline.from_pretrained( model_to_load, controlnet=controlnet, use_auth_token=use_auth_token, **kwargs, ) elif model_type in ["AltDiffusionPipeline", "AltDiffusionImg2ImgPipeline"]: self.ldm = AltDiffusionImg2ImgPipeline.from_pretrained( model_id, use_auth_token=use_auth_token, **kwargs ) elif model_type in [ "StableDiffusionPipeline", "StableDiffusionImg2ImgPipeline", ]: self.ldm = StableDiffusionImg2ImgPipeline.from_pretrained( model_id, use_auth_token=use_auth_token, **kwargs ) elif model_type in ["StableUnCLIPPipeline", "StableUnCLIPImg2ImgPipeline"]: self.ldm = StableUnCLIPImg2ImgPipeline.from_pretrained( model_id, use_auth_token=use_auth_token, **kwargs ) elif model_type in [ "StableDiffusionImageVariationPipeline", "StableDiffusionInstructPix2PixPipeline", "StableDiffusionUpscalePipeline", "StableDiffusionLatentUpscalePipeline", "StableDiffusionDepth2ImgPipeline", ]: self.ldm = DiffusionPipeline.from_pretrained( model_id, use_auth_token=use_auth_token, **kwargs ) elif model_type in ["KandinskyImg2ImgPipeline", "KandinskyPipeline"]: model_to_load = "kandinsky-community/kandinsky-2-1-prior" self.ldm = KandinskyImg2ImgPipeline.from_pretrained( model_id, use_auth_token=use_auth_token, **kwargs ) self.prior = KandinskyPriorPipeline.from_pretrained( model_to_load, use_auth_token=use_auth_token, **kwargs ) else: logger.debug("Falling back to generic auto pipeline loader") self.ldm = AutoPipelineForImage2Image.from_pretrained( model_id, use_auth_token=use_auth_token, **kwargs ) if isinstance( self.ldm, ( StableUnCLIPImg2ImgPipeline, StableUnCLIPPipeline, StableDiffusionPipeline, StableDiffusionImg2ImgPipeline, AltDiffusionPipeline, AltDiffusionImg2ImgPipeline, StableDiffusionControlNetPipeline, StableDiffusionInstructPix2PixPipeline, StableDiffusionImageVariationPipeline, StableDiffusionDepth2ImgPipeline, ), ): self.ldm.scheduler = DPMSolverMultistepScheduler.from_config( self.ldm.scheduler.config ) if not idle.UNLOAD_IDLE: self._model_to_gpu() @timing.timing def _model_to_gpu(self): if torch.cuda.is_available(): self.ldm.to("cuda") if isinstance(self.ldm, (KandinskyImg2ImgPipeline)): self.prior.to("cuda") def __call__(self, image: Image.Image, prompt: str = "", **kwargs) -> "Image.Image": """ Args: prompt (:obj:`str`): a string containing some text image (:obj:`PIL.Image.Image`): a condition image Return: A :obj:`PIL.Image.Image` with the raw image representation as PIL. """ if idle.UNLOAD_IDLE: with idle.request_witnesses(): self._model_to_gpu() resp = self._process_req(image, prompt) else: resp = self._process_req(image, prompt) return resp def _process_req(self, image, prompt, **kwargs): # only one image per prompt is supported kwargs["num_images_per_prompt"] = 1 if isinstance( self.ldm, ( StableDiffusionPipeline, StableDiffusionImg2ImgPipeline, AltDiffusionPipeline, AltDiffusionImg2ImgPipeline, StableDiffusionControlNetPipeline, StableDiffusionInstructPix2PixPipeline, StableDiffusionUpscalePipeline, StableDiffusionLatentUpscalePipeline, StableDiffusionDepth2ImgPipeline, ), ): if "num_inference_steps" not in kwargs: kwargs["num_inference_steps"] = int( os.getenv("DEFAULT_NUM_INFERENCE_STEPS", "25") ) images = self.ldm(prompt, image, **kwargs)["images"] return images[0] elif isinstance(self.ldm, StableDiffusionXLImg2ImgPipeline): if "num_inference_steps" not in kwargs: kwargs["num_inference_steps"] = int( os.getenv("DEFAULT_NUM_INFERENCE_STEPS", "25") ) image = image.convert("RGB") images = self.ldm(prompt, image=image, **kwargs)["images"] return images[0] elif isinstance(self.ldm, (StableUnCLIPImg2ImgPipeline, StableUnCLIPPipeline)): if "num_inference_steps" not in kwargs: kwargs["num_inference_steps"] = int( os.getenv("DEFAULT_NUM_INFERENCE_STEPS", "25") ) # image comes first images = self.ldm(image, prompt, **kwargs)["images"] return images[0] elif isinstance(self.ldm, StableDiffusionImageVariationPipeline): if "num_inference_steps" not in kwargs: kwargs["num_inference_steps"] = int( os.getenv("DEFAULT_NUM_INFERENCE_STEPS", "25") ) # only image is needed images = self.ldm(image, **kwargs)["images"] return images[0] elif isinstance(self.ldm, (KandinskyImg2ImgPipeline)): if "num_inference_steps" not in kwargs: kwargs["num_inference_steps"] = int( os.getenv("DEFAULT_NUM_INFERENCE_STEPS", "100") ) # not all args are supported by the prior prior_args = { "num_inference_steps": kwargs["num_inference_steps"], "num_images_per_prompt": kwargs["num_images_per_prompt"], "negative_prompt": kwargs.get("negative_prompt", None), "guidance_scale": kwargs.get("guidance_scale", 7), } if "guidance_scale" not in kwargs: default_guidance_scale = os.getenv("DEFAULT_GUIDANCE_SCALE") if default_guidance_scale is not None: kwargs["guidance_scale"] = float(default_guidance_scale) prior_args["guidance_scale"] = float(default_guidance_scale) # Else, don't specify anything, leave the default behaviour image_emb, zero_image_emb = self.prior(prompt, **prior_args).to_tuple() images = self.ldm( prompt, image=image, image_embeds=image_emb, negative_image_embeds=zero_image_emb, **kwargs, )["images"] return images[0] else: raise ValueError("Model type not found or pipeline not implemented")