app.py (121 lines of code) (raw):
import os
import re
import uuid
import logging
import urllib.parse
from typing import Optional, Tuple
import chainlit as cl
from orchestrator_client import call_orchestrator_stream
# Constants
UUID_REGEX = re.compile(
r'^\s*([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\s+',
re.IGNORECASE
)
SUPPORTED_EXTENSIONS = [
"pdf", "bmp", "jpeg", "png", "tiff", "xlsx", "docx", "pptx",
"md", "txt", "html", "shtml", "htm", "py", "csv", "xml", "json", "vtt"
]
REFERENCE_REGEX = re.compile(
r'\[([^\]]+\.(?:' + '|'.join(SUPPORTED_EXTENSIONS) + r'))\]',
re.IGNORECASE
)
TERMINATE_TOKEN = "TERMINATE"
# Helpers
def read_env_boolean(var_name: str, default: bool = False) -> bool:
value = os.getenv(var_name, str(default)).strip().lower()
return value in {'true', '1', 'yes'}
def extract_conversation_id_from_chunk(chunk: str) -> Tuple[Optional[str], str]:
match = UUID_REGEX.match(chunk)
if match:
conv_id = match.group(1)
logging.info("[app] Extracted Conversation ID: %s", conv_id)
return conv_id, chunk[match.end():]
return None, chunk
def replace_source_reference_links(text: str) -> str:
def replacer(match):
source_file = match.group(1)
decoded = urllib.parse.unquote(source_file)
encoded = urllib.parse.quote(decoded)
return f"[{decoded}](/source/{encoded})"
return re.sub(REFERENCE_REGEX, replacer, text)
def check_authorization() -> dict:
app_user = cl.user_session.get("user")
if app_user:
metadata = app_user.metadata or {}
return {
'authorized': metadata.get('authorized', True),
'client_principal_id': metadata.get('client_principal_id', 'no-auth'),
'client_principal_name': metadata.get('client_principal_name', 'anonymous'),
'client_group_names': metadata.get('client_group_names', []),
'access_token': metadata.get('access_token')
}
return {
'authorized': True,
'client_principal_id': 'no-auth',
'client_principal_name': 'anonymous',
'client_group_names': [],
'access_token': None
}
# Check if authentication is enabled
ENABLE_AUTHENTICATION = read_env_boolean("ENABLE_AUTHENTICATION", False)
if ENABLE_AUTHENTICATION:
import auth
# Chainlit event handlers
@cl.on_chat_start
async def on_chat_start():
pass
# app_user = cl.user_session.get("user")
# if app_user:
# await cl.Message(content=f"Hello {app_user.metadata.get('user_name')}").send()
@cl.on_message
async def handle_message(message: cl.Message):
message.id = message.id or str(uuid.uuid4())
conversation_id = cl.user_session.get("conversation_id") or ""
response_msg = cl.Message(content="")
app_user = cl.user_session.get("user")
if app_user and not app_user.metadata.get('authorized', True):
await response_msg.stream_token(
"Oops! It looks like you don’t have access to this service. "
"If you think you should, please reach out to your administrator for help."
)
return
await response_msg.stream_token(" ")
buffer = ""
full_text = ""
references = set()
auth_info = check_authorization()
generator = call_orchestrator_stream(conversation_id, message.content, auth_info)
try:
async for chunk in generator:
# logging.info("[app] Chunk received: %s", chunk)
# Extract and update conversation ID
extracted_id, cleaned_chunk = extract_conversation_id_from_chunk(chunk)
if extracted_id:
conversation_id = extracted_id
cleaned_chunk = cleaned_chunk.replace("\\n", "\n")
# Track and clean references
found_refs = set(REFERENCE_REGEX.findall(cleaned_chunk))
if found_refs:
logging.info("[app] Found file references: %s", found_refs)
references.update(found_refs)
cleaned_chunk = REFERENCE_REGEX.sub("", cleaned_chunk)
buffer += cleaned_chunk
full_text += cleaned_chunk
# Handle TERMINATE token
token_index = buffer.find(TERMINATE_TOKEN)
if token_index != -1:
if token_index > 0:
await response_msg.stream_token(buffer[:token_index])
logging.info("[app] TERMINATE token detected. Draining remaining chunks...")
async for _ in generator:
pass # drain
break
# Stream safe part of buffer
safe_flush_length = len(buffer) - (len(TERMINATE_TOKEN) - 1)
if safe_flush_length > 0:
await response_msg.stream_token(buffer[:safe_flush_length])
buffer = buffer[safe_flush_length:]
except Exception as e:
error_message = (
"I'm sorry, I had a problem with the request. Please report the error. "
f"Details: {e}"
)
logging.exception("[app] Error during message handling.")
await response_msg.stream_token(error_message)
finally:
try:
await generator.aclose()
except RuntimeError as exc:
if "async generator ignored GeneratorExit" not in str(exc):
raise
cl.user_session.set("conversation_id", conversation_id)
await response_msg.update()
# Final reference handling and update
# references.update(REFERENCE_REGEX.findall(full_text))
# final_text = replace_source_reference_links(full_text.replace(TERMINATE_TOKEN, ""))
# response_msg.content = final_text
await response_msg.update()