bots/sdlc-slackbot/sdlc_slackbot/bot.py (349 lines of code) (raw):
import asyncio
import hashlib
import json
import os
import re
import threading
import time
import traceback
from logging import getLogger
import validate
import validators
from database import *
from gdoc import gdoc_get
from openai_slackbot.bot import init_bot, start_app
from openai_slackbot.utils.envvars import string
from peewee import *
from playhouse.db_url import *
from playhouse.shortcuts import model_to_dict
from sdlc_slackbot.config import get_config, load_config
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from slack_sdk import WebClient
from utils import *
logger = getLogger(__name__)
async def send_update_notification(input, response):
risk_str, confidence_str = risk_and_confidence_to_string(response)
risk_num = response["risk"]
confidence_num = response["confidence"]
msg = f"""
Project {input['project_name']} has been updated and has a new decision:
This new decision for the project is that it is: *{risk_str}({risk_num})* with *{confidence_str}({confidence_num})*. {response['justification']}."
"""
await app.client.chat_postMessage(channel=config.notification_channel_id, text=msg)
def hash_content(content):
return hashlib.sha256(content.encode("utf-8")).hexdigest()
url_pat = re.compile(
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+\b(?!>)"
)
def extract_urls(text):
logger.info(f"extracting urls from {text}")
urls = re.findall(url_pat, text)
return [url for url in urls if validators.url(url)]
async def async_fetch_slack(url):
parts = url.split("/")
channel = parts[-2]
ts = parts[-1]
ts = ts[1:] # trim p
seconds = ts[:-6]
nanoseconds = ts[-6:]
result = await app.client.conversations_replies(channel=channel, ts=f"{seconds}.{nanoseconds}")
return " ".join(message.get("text", "") for message in result.data.get("messages", []))
content_fetchers = [
(
lambda u: u.startswith(("https://docs.google.com/document", "docs.google.com/document")),
gdoc_get,
),
(lambda u: "slack.com/archives" in u, async_fetch_slack),
]
async def fetch_content(url):
for condition, fetcher in content_fetchers:
if condition(url):
if asyncio.iscoroutinefunction(fetcher):
return await fetcher(url) # Await the result if it's a coroutine function
else:
return fetcher(url) # Call it directly if it's not a coroutine function
form = [
input_block(
"project_name",
"Project Name",
field("plain_text_input", "Enter the project name"),
),
input_block(
"project_description",
"Project Description",
field("plain_text_input", "Enter the project description", multiline=True),
),
input_block(
"links_to_resources",
"Links to Resources",
field("plain_text_input", "Enter links to resources", multiline=True),
),
input_block("point_of_contact", "Point of Contact", field("users_select", "Select a user")),
input_block(
"estimated_go_live_date",
"Estimated Go Live Date",
field("datepicker", "Select a date"),
),
submit_block("submit_form"),
]
def risk_and_confidence_to_string(decision):
# Lookup tables for risk and confidence
risk_lookup = {
(1, 2): "extremely low risk",
(3, 3): "low risk",
(4, 5): "medium risk",
(6, 7): "medium-high risk",
(8, 9): "high risk",
(10, 10): "critical risk",
}
confidence_lookup = {
(1, 2): "extremely low confidence",
(3, 3): "low confidence",
(4, 5): "medium confidence",
(6, 7): "medium-high confidence",
(8, 9): "high confidence",
(10, 10): "extreme confidence",
}
# Function to find the appropriate string from a lookup table
def find_in_lookup(value, lookup):
for (min_val, max_val), descriptor in lookup.items():
if min_val <= value <= max_val:
return descriptor
return "unknown"
# Convert risk and confidence using their respective lookup tables
risk_str = find_in_lookup(decision["risk"], risk_lookup)
confidence_str = find_in_lookup(decision["confidence"], confidence_lookup)
return risk_str, confidence_str
def decision_msg(response):
risk_str, confidence_str = risk_and_confidence_to_string(response)
risk_num = response["risk"]
confidence_num = response["confidence"]
return f"Thanks for your response! Based on this input, we've decided that this project is *{risk_str}({risk_num})* with *{confidence_str}({confidence_num})*. {response['justification']}."
skip_params = set(
[
"id",
"project_name",
"links_to_resources",
"point_of_contact",
"estimated_go_live_date",
]
)
multiple_whitespace_pat = re.compile(r"\s+")
def model_params_to_str(params):
ss = (v for k, v in params.items() if k not in skip_params)
return re.sub(multiple_whitespace_pat, " ", "\n".join(map(str, ss))).strip()
def summarize_params(params):
summary = {}
for k, v in params.items():
if k not in skip_params:
summary[k] = ask_ai(
config.base_prompt + config.summary_prompt, v[: config.context_limit]
)
else:
summary[k] = v
return summary
async def handle_app_mention_events(say, event):
logger.info("App mention event received:", event)
await say(blocks=form, thread_ts=event["ts"])
async def handle_message_events(say, message):
logger.info("message: ", message)
if message["channel_type"] == "im":
await say(blocks=form, thread_ts=message["ts"])
def get_response_with_retry(prompt, context, max_retries=1):
prompt = prompt.strip().replace("\n", " ")
retries = 0
while retries <= max_retries:
try:
response = ask_ai(prompt, context)
return response
except json.JSONDecodeError as e:
logger.error(f"JSON error on attempt {retries + 1}: {e}")
retries += 1
if retries > max_retries:
return {}
def normalize_response(response):
if isinstance(response, list):
return [json.loads(block.text) for block in response]
elif isinstance(response, dict):
return [response]
else:
raise TypeError("Unsupported response type")
def clean_normalized_response(normalized_responses):
"""
Remove the 'decision' key from each dictionary in a list of dictionaries.
Break it down into 'risk' and 'confidence'
:param normalized_responses: A list of dictionaries.
:return: The list of dictionaries with 'decision' key broken down.
"""
for response in normalized_responses:
if "decision" in response:
decision = response["decision"]
response["risk"] = decision.get("risk")
response["confidence"] = decision.get("confidence")
response.pop("decision", None)
return normalized_responses
async def submit_form(ack, body, say):
await ack()
try:
ts = body["container"]["message_ts"]
values = body["state"]["values"]
params = get_form_input(
values,
"project_name",
"project_description",
"links_to_resources",
"point_of_contact",
"estimated_go_live_date",
)
validate.required(params, "project_name", "project_description", "point_of_contact")
await say(text=config.reviewing_message, thread_ts=ts)
try:
assessment = Assessment.create(**params, user_id=body["user"]["id"])
except IntegrityError as e:
raise validate.ValidationError("project_name", "must be unique")
resources = []
for url in extract_urls(params.get("links_to_resources", "")):
content = await fetch_content(url)
if content:
params[url] = content
resources.append(
dict(
assessment=assessment,
url=url,
content_hash=hash_content(content),
)
)
Resource.insert_many(resources).execute()
context = model_params_to_str(params)
if len(context) > config.context_limit:
logger.info(f"context too long: {len(context)}. Summarizing...")
summarized_context = summarize_params(params)
context = model_params_to_str(summarized_context)
# FIXME: is there a better way to handle this? currently, if the summary is still too long
# we just give up and cut it off
if len(context) > config.context_limit:
logger.info(f"Summarized context too long: {len(context)}. Cutting off...")
context = context[: config.context_limit]
response = get_response_with_retry(config.base_prompt + config.initial_prompt, context)
if not response:
return
normalized_response = normalize_response(response)
clean_response = clean_normalized_response(normalized_response)
for item in clean_response:
if item["outcome"] == "decision":
assessment.update(**item).execute()
await say(text=decision_msg(item), thread_ts=ts)
elif item["outcome"] == "followup":
db_questions = [dict(assessment=assessment, question=q) for q in item["questions"]]
Question.insert_many(db_questions).execute()
form = []
for i, q in enumerate(item["questions"]):
form.append(
input_block(
f"question_{i}",
q,
field("plain_text_input", "...", multiline=True),
)
)
form.append(submit_block(f"submit_followup_questions_{assessment.id}"))
await say(blocks=form, thread_ts=ts)
except validate.ValidationError as e:
await say(text=f"{e.field}: {e.issue}", thread_ts=ts)
except Exception as e:
import traceback
traceback.print_exc()
await say(text=config.irrecoverable_error_message, thread_ts=ts)
async def submit_followup_questions(ack, body, say):
await ack()
try:
assessment_id = int(body["actions"][0]["action_id"].split("_")[-1])
ts = body["container"]["message_ts"]
assessment = Assessment.get(Assessment.id == assessment_id)
params = model_to_dict(assessment)
followup_questions = [q.question for q in assessment.questions]
except Exception as e:
logger.error(f"Failed to find params for user {body['user']['id']}", e)
await say(text=config.recoverable_error_message, thread_ts=ts)
return
try:
await say(text=config.reviewing_message, thread_ts=ts)
values = body["state"]["values"]
for i, q in enumerate(followup_questions):
params[q] = values[f"question_{i}"][f"question_{i}_input"]["value"]
for question in assessment.questions:
question.answer = params[question.question]
question.save()
context = model_params_to_str(params)
response = ask_ai(config.base_prompt, context)
text_to_update = response
if (
isinstance(response, dict)
and "text" in response
and "type" in response
and response["type"] == "text"
):
# Extract the text from the content block
text_to_update = response.text
normalized_response = normalize_response(text_to_update)
clean_response = clean_normalized_response(normalized_response)
for item in clean_response:
if item["outcome"] == "decision":
assessment.update(**item).execute()
await say(text=decision_msg(item), thread_ts=ts)
except Exception as e:
logger.error(f"error: {e} processing followup questions: {json.dumps(body, indent=2)}")
await say(text=config.irrecoverable_error_message, thread_ts=ts)
def update_resources():
while True:
time.sleep(monitor_thread_sleep_seconds)
try:
for assessment in Assessment.select():
logger.info(f"checking {assessment.project_name} for updates")
assessment_params = model_to_dict(assessment)
new_params = assessment_params.copy()
changed = False
previous_content = ""
for resource in assessment.resources:
new_content = asyncio.run(fetch_content(resource.url))
if resource.content_hash != hash_content(new_content):
# just save previous content in memory temporarily
previous_content = resource.content
resource.content = new_content
new_params[resource.url] = new_content
changed = True
if not changed:
continue
old_context = model_params_to_str(assessment_params)
new_context = model_params_to_str(new_params)
context = {
"previous_context": previous_content,
"previous_decision": {
"risk": assessment.risk,
"confidence": assessment.confidence,
"justification": assessment.justification,
},
"new_context": new_content,
}
context_json = json.dumps(context, indent=2)
new_response = ask_ai(config.base_prompt + config.update_prompt, context_json)
resource.content_hash = hash_content(new_content)
resource.save()
if new_response["outcome"] == "unchanged":
continue
normalized_response = normalize_response(new_response)
clean_response = clean_normalized_response(normalized_response)
for item in clean_response:
assessment.update(**item).execute()
asyncio.run(send_update_notification(assessment_params, new_response))
except Exception as e:
logger.error(f"error: {e} updating resources")
traceback.print_exc()
monitor_thread_sleep_seconds = 6
if __name__ == "__main__":
current_dir = os.path.dirname(os.path.abspath(__file__))
load_config(os.path.join(current_dir, "config.toml"))
template_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
config = get_config()
message_handler = []
action_handlers = []
view_submission_handlers = []
app = asyncio.run(
init_bot(
openai_organization_id=config.openai_organization_id,
slack_message_handler=message_handler,
slack_action_handlers=action_handlers,
slack_template_path=template_path,
)
)
# Register your custom event handlers
app.event("app_mention")(handle_app_mention_events)
app.message()(handle_message_events)
app.action("submit_form")(submit_form)
app.action(re.compile("submit_followup_questions.*"))(submit_followup_questions)
t = threading.Thread(target=update_resources)
t.start()
# Start the app
asyncio.run(start_app(app))