operators.py (383 lines of code) (raw):
import os
import queue
import sys
import threading
import traceback
import bpy
from .backend import Backend
from .tools import LlamaMeshModelManager, ToolManager
from .utils import get_available_models, get_models_dir
STATUS_CANCELED = "Request canceled by user"
STATUS_DONE = "Agent ran out of steps"
class MESHGEN_OT_Chat(bpy.types.Operator):
bl_idname = "meshgen.chat"
bl_label = "Chat"
bl_description = "Chat with the selected model"
bl_options = {"REGISTER", "INTERNAL"}
def execute(self, context):
prefs = bpy.context.preferences.addons[__package__].preferences
props = context.scene.meshgen_props
backend = Backend.instance()
props.history.clear()
self.temperature = prefs.temperature
self.prompt = props.prompt
self.messages = [{"role": "user", "content": self.prompt}]
self.log_text = bpy.data.texts.get("meshgen log")
if self.log_text is None:
self.log_text = bpy.data.texts.new("meshgen log")
else:
self.log_text.clear()
for window in context.window_manager.windows:
for area in window.screen.areas:
if area.type == "TEXT_EDITOR":
for space in area.spaces:
if (
space.type == "TEXT_EDITOR"
and space.text == self.log_text
):
space.show_word_wrap = True
space.show_line_numbers = True
space.top = 0
break
log_open = False
for window in context.window_manager.windows:
for area in window.screen.areas:
if area.type != "TEXT_EDITOR":
continue
for space in area.spaces:
if space.type == "TEXT_EDITOR" and space.text == self.log_text:
log_open = True
break
if log_open:
break
if log_open:
break
if not log_open:
bpy.ops.screen.area_split(direction="VERTICAL")
new_area = context.screen.areas[-1]
new_area.type = "TEXT_EDITOR"
new_area.spaces.active.text = self.log_text
self.log_text.write("\n----- New Chat -----\n")
self.log_text.write(f"Prompt: {self.prompt}\n")
if not backend.is_loaded():
provider_to_model = {
"LOCAL": prefs.current_model,
"ollama": prefs.ollama_model_name,
"huggingface": prefs.huggingface_model_id,
"anthropic": prefs.anthropic_model_id,
"openai": prefs.openai_model_id,
}
model_name = (
provider_to_model["LOCAL"]
if prefs.backend_type == "LOCAL"
else provider_to_model.get(prefs.llm_provider)
)
self.add_event("LOADING", "Loading...", f"Loading {model_name}...")
try:
backend.load()
self.pop_event()
self.add_event(
"LOADING_SUCCESS", model_name, f"Finished loading {model_name}"
)
except Exception as e:
self.pop_event()
self.add_event(
"LOADING_ERROR",
str(e),
f"Error loading {model_name}:\n{traceback.format_exc()}",
)
return {"CANCELLED"}
self._stop_event = threading.Event()
self._output_queue = backend.start_chat_completion(
self.messages, self.temperature, self._stop_event
)
props.state = "RUNNING"
self.add_event("THINKING", None, "Thinking...")
wm = context.window_manager
self._timer = wm.event_timer_add(0.1, window=context.window)
wm.modal_handler_add(self)
return {"RUNNING_MODAL"}
def modal(self, context, event):
props = context.scene.meshgen_props
if props.state == "CANCELED" or event.type == "ESC":
self._stop_event.set()
self.add_event("CANCELED", STATUS_CANCELED, "Canceled")
return self.finish(context)
if event.type == "TIMER" and props.state == "RUNNING":
try:
while True:
message_type, content = self._output_queue.get_nowait()
if message_type == "STEP":
thought, full_output = content
self.pop_event()
self.add_event("STEP", thought, full_output)
self.add_event("THINKING", None, "Thinking...")
self.redraw(context)
elif message_type == "STEP_ERROR":
self.pop_event()
self.add_event("ERROR", content, f"Error: {content}")
self.add_event("THINKING", None, "Thinking...")
self.redraw(context)
elif message_type == "ERROR":
error_msg, full_error = content
self.pop_event()
self.add_event("ERROR", error_msg, full_error)
return self.finish(context)
elif message_type == "FINAL_ANSWER":
self.pop_event()
self.add_event("SUCCESS", content, f"Final Answer: {content}")
return self.finish(context)
elif message_type == "CANCELED":
self.pop_event()
self.add_event("CANCELED", STATUS_CANCELED, STATUS_CANCELED)
return self.finish(context)
elif message_type == "DONE":
self.pop_event()
self.add_event("ERROR", STATUS_DONE, STATUS_DONE)
return self.finish(context)
except queue.Empty:
pass
ToolManager.instance().process_tasks(context)
return {"PASS_THROUGH"}
def add_event(self, event_type, message, long_message):
props = bpy.context.scene.meshgen_props
event = props.history.add()
event.type = event_type
if message:
event.content = message
if long_message:
lines = str(long_message).split("\n")
for line in lines:
self.log_text.write(line + "\n")
def pop_event(self):
props = bpy.context.scene.meshgen_props
props.history.remove(len(props.history) - 1)
def finish(self, context):
props = context.scene.meshgen_props
props.state = "READY"
context.window_manager.event_timer_remove(self._timer)
self.redraw(context)
return {"FINISHED"}
def redraw(self, context):
for area in context.screen.areas:
if area.type == "VIEW_3D":
area.tag_redraw()
class MESHGEN_OT_CancelChat(bpy.types.Operator):
bl_idname = "meshgen.cancel_chat"
bl_label = "Cancel chat"
bl_description = "Cancel the current chat process"
bl_options = {"REGISTER", "INTERNAL"}
def execute(self, context):
props = context.scene.meshgen_props
props.state = "CANCELED"
return {"FINISHED"}
class MESHGEN_OT_OpenLog(bpy.types.Operator):
bl_idname = "meshgen.open_log"
bl_label = "Open log"
bl_description = "Open the log file"
bl_options = {"REGISTER", "INTERNAL"}
def execute(self, context):
log_text = bpy.data.texts.get("meshgen log")
if log_text is None:
log_text = bpy.data.texts.new("meshgen log")
for area in context.screen.areas:
if area.type == "TEXT_EDITOR":
area.spaces.active.text = log_text
return {"FINISHED"}
bpy.ops.screen.area_split(direction="VERTICAL")
new_area = context.screen.areas[-1]
new_area.type = "TEXT_EDITOR"
new_area.spaces.active.text = log_text
return {"FINISHED"}
class MESHGEN_OT_DownloadModel(bpy.types.Operator):
bl_idname = "meshgen.download_model"
bl_label = "Download model"
bl_description = "Download model from Hugging Face"
bl_options = {"REGISTER", "INTERNAL"}
__annotations__ = {
"repo_id": bpy.props.StringProperty(name="Repository ID"),
"filename": bpy.props.StringProperty(name="Filename"),
}
_timer = None
_download_thread = None
_progress_queue = None
def execute(self, context):
if self.filename in get_available_models():
self.report({"INFO"}, f"{self.filename} already downloaded")
return {"CANCELLED"}
import queue
import re
import sys
import threading
from huggingface_hub import hf_hub_download
prefs = context.preferences.addons[__package__].preferences
prefs.downloading = True
prefs.download_progress = 0
self._progress_queue = queue.Queue()
class TqdmCapture:
def __init__(self, queue, stream):
self.queue = queue
self.stream = stream
self.original_write = stream.write
self.original_flush = stream.flush
def write(self, string):
match = re.search(r"\r.*?(\d+)%", string)
if match:
try:
percentage = int(match.group(1))
self.queue.put(percentage)
except Exception as e:
self.queue.put(f"Error parsing progress: {string}, {e}")
self.original_write(string)
self.original_flush()
def flush(self):
self.original_flush()
def download_task():
try:
old_stderr = sys.stderr
sys.stderr = TqdmCapture(self._progress_queue, sys.stderr)
hf_hub_download(
self.repo_id,
filename=self.filename,
local_dir=get_models_dir(),
)
prefs = bpy.context.preferences.addons[__package__].preferences
if prefs.downloading:
self._progress_queue.put("finished")
except InterruptedError:
pass
except Exception as e:
prefs = bpy.context.preferences.addons[__package__].preferences
if prefs.downloading:
self._progress_queue.put(f"Error downloading model: {e}")
finally:
sys.stderr = old_stderr
self._download_thread = threading.Thread(target=download_task)
self._download_thread.start()
wm = context.window_manager
self._timer = wm.event_timer_add(0.1, window=context.window)
wm.modal_handler_add(self)
return {"RUNNING_MODAL"}
def modal(self, context, event):
if event.type == "TIMER":
prefs = context.preferences.addons[__package__].preferences
if not prefs.downloading:
self.report({"INFO"}, "Download canceled")
self.cleanup(context)
return {"CANCELLED"}
while not self._progress_queue.empty():
item = self._progress_queue.get()
if isinstance(item, (int, float)):
prefs.download_progress = item
for window in context.window_manager.windows:
for area in window.screen.areas:
if area.type == "PREFERENCES":
area.tag_redraw()
elif isinstance(item, str):
if item == "finished":
prefs.downloading = False
if len(get_available_models()) == 1:
prefs.current_model = get_available_models()[0]
self.report(
{"INFO"},
f"Successfully downloaded {self.filename} from {self.repo_id}",
)
self.cleanup(context)
return {"FINISHED"}
else:
self.report({"ERROR"}, item)
prefs.downloading = False
self.cleanup(context)
return {"CANCELLED"}
return {"RUNNING_MODAL"}
else:
return {"PASS_THROUGH"}
def cleanup(self, context):
prefs = context.preferences.addons[__package__].preferences
prefs.downloading = False
if self._progress_queue:
try:
while not self._progress_queue.empty():
self._progress_queue.get_nowait()
except Exception:
pass
self._progress_queue = None
if self._timer:
try:
bpy.context.window_manager.event_timer_remove(self._timer)
except Exception:
pass
self._timer = None
self._download_thread = None
for window in context.window_manager.windows:
for area in window.screen.areas:
if area.type == "PREFERENCES" or area.type == "VIEW_3D":
area.tag_redraw()
class MESHGEN_OT_SelectModel(bpy.types.Operator):
bl_idname = "meshgen.select_model"
bl_label = "Select model"
bl_description = "Select the model to use for generation"
bl_options = {"REGISTER", "INTERNAL"}
__annotations__ = {"model": bpy.props.StringProperty(name="Model")}
def execute(self, context):
prefs = context.preferences.addons[__package__].preferences
prefs.current_model = self.model
return {"FINISHED"}
class MESHGEN_OT_OpenModelsFolder(bpy.types.Operator):
bl_idname = "meshgen.open_models_folder"
bl_label = "Open models folder"
bl_description = "Open the folder containing downloaded models"
bl_options = {"REGISTER", "INTERNAL"}
def execute(self, context):
models_dir = get_models_dir()
if not models_dir.exists():
models_dir.mkdir(parents=True)
try:
if os.name == "nt":
os.startfile(models_dir)
elif os.name == "posix":
import subprocess
opener = "open" if sys.platform == "darwin" else "xdg-open"
subprocess.run([opener, models_dir])
except Exception as e:
self.report({"ERROR"}, f"Failed to open models folder: {e}")
return {"CANCELLED"}
return {"FINISHED"}
class MESHGEN_OT_LoadLlamaMesh(bpy.types.Operator):
bl_idname = "meshgen.load_llama_mesh"
bl_label = "Load LLaMA-Mesh"
bl_description = "Load the LLaMA-Mesh model"
bl_options = {"REGISTER", "INTERNAL"}
def execute(self, context):
model_manager = LlamaMeshModelManager.instance()
model_manager.load_model()
Backend.reset()
return {"FINISHED"}
class MESHGEN_OT_UnloadLlamaMesh(bpy.types.Operator):
bl_idname = "meshgen.unload_llama_mesh"
bl_label = "Unload LLaMA-Mesh"
bl_description = "Unload the LLaMA-Mesh model"
bl_options = {"REGISTER", "INTERNAL"}
def execute(self, context):
model_manager = LlamaMeshModelManager.instance()
model_manager.unload_model()
Backend.reset()
return {"FINISHED"}
def register():
bpy.utils.register_class(MESHGEN_OT_Chat)
bpy.utils.register_class(MESHGEN_OT_CancelChat)
bpy.utils.register_class(MESHGEN_OT_OpenLog)
bpy.utils.register_class(MESHGEN_OT_DownloadModel)
bpy.utils.register_class(MESHGEN_OT_SelectModel)
bpy.utils.register_class(MESHGEN_OT_OpenModelsFolder)
bpy.utils.register_class(MESHGEN_OT_LoadLlamaMesh)
bpy.utils.register_class(MESHGEN_OT_UnloadLlamaMesh)
def unregister():
bpy.utils.unregister_class(MESHGEN_OT_Chat)
bpy.utils.unregister_class(MESHGEN_OT_CancelChat)
bpy.utils.unregister_class(MESHGEN_OT_OpenLog)
bpy.utils.unregister_class(MESHGEN_OT_DownloadModel)
bpy.utils.unregister_class(MESHGEN_OT_SelectModel)
bpy.utils.unregister_class(MESHGEN_OT_OpenModelsFolder)
bpy.utils.unregister_class(MESHGEN_OT_LoadLlamaMesh)
bpy.utils.unregister_class(MESHGEN_OT_UnloadLlamaMesh)