vision/smolvlm2/smolvlm/conversation.py (302 lines of code) (raw):

import dataclasses from enum import auto, Enum from typing import List, Any, Dict, Union, Tuple import re import base64 from io import BytesIO from PIL import Image from transformers import AutoTokenizer class SeparatorStyle(Enum): """Different separator style.""" SINGLE = auto() TWO = auto() MPT = auto() PLAIN = auto() CHATML = auto() LLAMA_2 = auto() LLAMA_3 = auto() QWEN = auto() GEMMA = auto() @dataclasses.dataclass class Conversation: """A class that keeps all conversation history.""" system: str roles: List[str] messages: List[List[str]] offset: int sep_style: SeparatorStyle = SeparatorStyle.SINGLE sep: str = "###" sep2: str = None version: str = "Unknown" tokenizer_id: str = "" tokenizer: Any = None # Stop criteria (the default one is EOS token) stop_str: Union[str, List[str]] = None # Stops generation if meeting any token in this list stop_token_ids: List[int] = None skip_next: bool = False def get_prompt(self): messages = self.messages if len(messages) > 0 and type(messages[0][1]) is tuple: messages = self.messages.copy() init_role, init_msg = messages[0].copy() init_msg = init_msg[0] if "mmtag" in self.version: init_msg = init_msg.replace("<image>", "").strip() messages[0] = (init_role, init_msg) messages.insert(0, (self.roles[0], "<Image><image></Image>")) messages.insert(1, (self.roles[1], "Received.")) elif not init_msg.startswith("<image>"): init_msg = init_msg.replace("<image>", "").strip() messages[0] = (init_role, "<image>\n" + init_msg) else: messages[0] = (init_role, init_msg) if self.sep_style == SeparatorStyle.SINGLE: ret = self.system + self.sep for role, message in messages: if message: if type(message) is tuple: message, _, _ = message ret += role + ": " + message + self.sep else: ret += role + ":" elif self.sep_style == SeparatorStyle.TWO: seps = [self.sep, self.sep2] ret = self.system + seps[0] for i, (role, message) in enumerate(messages): if message: if type(message) is tuple: message, _, _ = message ret += role + ": " + message + seps[i % 2] else: ret += role + ":" elif self.sep_style == SeparatorStyle.CHATML: ret = "" if self.system == "" else self.system + self.sep + "\n" for role, message in messages: if message: if type(message) is tuple: message, images, _ = message message = "<image>" * len(images) + message ret += role + "\n" + message + self.sep + "\n" else: ret += role + "\n" return ret elif self.sep_style == SeparatorStyle.LLAMA_3: if self.tokenizer is None: raise ValueError("Llama 3 tokenizer is not available. Make sure you have the necessary permissions.") chat_template_messages = [{"role": "system", "content": self.system}] for role, message in messages: if message: if type(message) is tuple: message, images = message message = "<image>" * len(images) + message chat_template_messages.append({"role": role, "content": message}) # print(chat_template_messages) return self.tokenizer.apply_chat_template(chat_template_messages, tokenize=False, add_generation_prompt=True) # ret = "" if self.system == "" else self.system + self.sep + "\n" # for role, message in messages: # if message: # if type(message) is tuple: # message, images = message # message = "<image>" * len(images) + message # ret += role + "\n" + message + self.sep + "\n" # else: # ret += role + "\n" # return ret elif self.sep_style == SeparatorStyle.MPT: ret = self.system + self.sep for role, message in messages: if message: if type(message) is tuple: message, _, _ = message ret += role + message + self.sep else: ret += role elif self.sep_style == SeparatorStyle.GEMMA: ret = "" for i, (role, message) in enumerate(messages): assert role == self.roles[i % 2], "Conversation should alternate user/assistant/user/assistant/..." if message: if type(message) is tuple: message, _, _ = message ret += role + message + self.sep else: ret += role elif self.sep_style == SeparatorStyle.LLAMA_2: wrap_sys = lambda msg: f"<<SYS>>\n{msg}\n<</SYS>>\n\n" if len(msg) > 0 else msg wrap_inst = lambda msg: f"[INST] {msg} [/INST]" ret = "" for i, (role, message) in enumerate(messages): if i == 0: assert message, "first message should not be none" assert role == self.roles[0], "first message should come from user" if message: if type(message) is tuple: message, _, _ = message if i == 0: message = wrap_sys(self.system) + message if i % 2 == 0: message = wrap_inst(message) ret += self.sep + message else: ret += " " + message + " " + self.sep2 else: ret += "" ret = ret.lstrip(self.sep) elif self.sep_style == SeparatorStyle.PLAIN: seps = [self.sep, self.sep2] ret = self.system for i, (role, message) in enumerate(messages): if message: if type(message) is tuple: message, _, _ = message ret += message + seps[i % 2] else: ret += "" else: raise ValueError(f"Invalid style: {self.sep_style}") return ret def append_message(self, role, message): self.messages.append([role, message]) def process_image(self, image, image_process_mode, return_pil=False, image_format="PNG"): if image_process_mode == "Pad": def expand2square(pil_img, background_color=(122, 116, 104)): width, height = pil_img.size if width == height: return pil_img elif width > height: result = Image.new(pil_img.mode, (width, width), background_color) result.paste(pil_img, (0, (width - height) // 2)) return result else: result = Image.new(pil_img.mode, (height, height), background_color) result.paste(pil_img, ((height - width) // 2, 0)) return result image = expand2square(image) elif image_process_mode in ["Default", "Crop"]: pass elif image_process_mode == "Resize": image = image.resize((336, 336)) else: raise ValueError(f"Invalid image_process_mode: {image_process_mode}") if type(image) is not Image.Image: image = Image.open(image).convert("RGB") max_hw, min_hw = max(image.size), min(image.size) aspect_ratio = max_hw / min_hw max_len, min_len = 672, 448 shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw)) longest_edge = int(shortest_edge * aspect_ratio) W, H = image.size if H > W: H, W = longest_edge, shortest_edge else: H, W = shortest_edge, longest_edge image = image.resize((W, H)) if return_pil: return image else: buffered = BytesIO() image.save(buffered, format=image_format) img_b64_str = base64.b64encode(buffered.getvalue()).decode() return img_b64_str def get_images(self, return_pil=False, return_path=False): images = [] for i, (role, msg) in enumerate(self.messages[self.offset :]): if i % 2 == 0: if type(msg) is tuple: msg, image, image_process_mode = msg if type(image) != list: image = [image] for img in image: if not return_path and self.is_image_file(img): img = self.process_image(img, image_process_mode, return_pil=return_pil) else: images.append(img) return images def is_image_file(self, filename): image_extensions = [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"] return any(filename.lower().endswith(ext) for ext in image_extensions) def is_video_file(self, filename): video_extensions = [".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".mpeg", ".mpg"] return any(filename.lower().endswith(ext) for ext in video_extensions) def to_gradio_chatbot(self): ret = [] for i, (role, msg) in enumerate(self.messages[self.offset :]): if i % 2 == 0: if type(msg) is tuple: msg, image, image_process_mode = msg if type(image) != list: image = [image] if len(image) == 1: msg = "<image>\n" + msg.replace("<image>", "").strip() else: msg = re.sub(r"(<image>)\n(?=<image>)", r"\1 ", msg) img_str_list = [] for img in image: if self.is_image_file(img): img_b64_str = self.process_image(img, "Default", return_pil=False, image_format="JPEG") img_str = f'<img src="data:image/jpeg;base64,{img_b64_str}" style="max-width: 256px; max-height: 256px; width: auto; height: auto; object-fit: contain;"/>' img_str_list.append(img_str) elif self.is_video_file(img): ret.append(((img,), None)) msg = msg.strip() img_place_holder = "" for img_str in img_str_list: img_place_holder += f"{img_str}\n\n" if len(img_str_list) > 0: msg = f"{img_place_holder}\n\n{msg}" if len(msg) > 0: ret.append([msg, None]) else: ret.append([msg, None]) else: ret[-1][-1] = msg return ret def copy(self): return Conversation(system=self.system, roles=self.roles, messages=[[x, y] for x, y in self.messages], offset=self.offset, sep_style=self.sep_style, sep=self.sep, sep2=self.sep2, version=self.version) def dict(self): if len(self.get_images()) > 0: return { "system": self.system, "roles": self.roles, "messages": [[x, y[0] if type(y) is tuple else y] for x, y in self.messages], "offset": self.offset, "sep": self.sep, "sep2": self.sep2, } return { "system": self.system, "roles": self.roles, "messages": self.messages, "offset": self.offset, "sep": self.sep, "sep2": self.sep2, } def safe_load_tokenizer(tokenizer_id): try: return AutoTokenizer.from_pretrained(tokenizer_id) except Exception: return None conv_qwen = Conversation( system="""<|im_start|>system You are a helpful assistant.""", roles=("<|im_start|>user", "<|im_start|>assistant"), version="qwen", messages=[], offset=0, sep_style=SeparatorStyle.CHATML, sep="<|im_end|>", ) conv_plain = Conversation( system="", roles=("", ""), messages=[], offset=0, sep_style=SeparatorStyle.PLAIN, sep="\n", ) conv_chatml_direct = Conversation( system="""<|im_start|>system Answer the questions.""", roles=("<|im_start|>user\n", "<|im_start|>assistant\n"), version="mpt", messages=[], offset=0, sep_style=SeparatorStyle.MPT, sep="<|im_end|>", ) default_conversation = conv_plain conv_templates = { "default": conv_qwen, "plain": conv_plain, "chatml_direct": conv_chatml_direct, "qwen_1_5": conv_qwen, "qwen_2": conv_qwen, } if __name__ == "__main__": print(default_conversation.get_prompt())