backend/services/messaging/message_service.py (577 lines of code) (raw):
import re
import time
import datetime
from cachetools import TTLCache, cached
from typing import List
from flask import current_app
from sqlalchemy import text, func
from markdown import markdown
from backend import create_app, db
from backend.models.dtos.message_dto import MessageDTO, MessagesDTO
from backend.models.dtos.stats_dto import Pagination
from backend.models.postgis.message import Message, MessageType, NotFound
from backend.models.postgis.notification import Notification
from backend.models.postgis.project import Project
from backend.models.postgis.task import TaskStatus, TaskAction, TaskHistory
from backend.models.postgis.statuses import TeamRoles
from backend.services.messaging.smtp_service import SMTPService
from backend.services.messaging.template_service import (
get_txt_template,
template_var_replacing,
clean_html,
)
from backend.services.users.user_service import UserService, User
message_cache = TTLCache(maxsize=512, ttl=30)
class MessageServiceError(Exception):
""" Custom Exception to notify callers an error occurred when handling mapping """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class MessageService:
@staticmethod
def send_welcome_message(user: User):
"""Sends welcome message to new user at Sign up"""
org_code = current_app.config["ORG_CODE"]
text_template = get_txt_template("welcome_message_en.txt")
hot_welcome_section = get_txt_template("hot_welcome_section_en.txt")
replace_list = [
["[USERNAME]", user.username],
["[ORG_CODE]", org_code],
["[ORG_NAME]", current_app.config["ORG_NAME"]],
["[SETTINGS_LINK]", MessageService.get_user_settings_link()],
["[HOT_WELCOME]", hot_welcome_section if org_code == "HOT" else ""],
]
text_template = template_var_replacing(text_template, replace_list)
welcome_message = Message()
welcome_message.message_type = MessageType.SYSTEM.value
welcome_message.to_user_id = user.id
welcome_message.subject = "Welcome to the {} Tasking Manager".format(org_code)
welcome_message.message = text_template
welcome_message.save()
return welcome_message.id
@staticmethod
def send_message_after_validation(
status: int, validated_by: int, mapped_by: int, task_id: int, project_id: int
):
""" Sends mapper a notification after their task has been marked valid or invalid """
if validated_by == mapped_by:
return # No need to send a notification if you've verified your own task
user = UserService.get_user_by_id(mapped_by)
text_template = get_txt_template(
"invalidation_message_en.txt"
if status == TaskStatus.INVALIDATED
else "validation_message_en.txt"
)
status_text = (
"marked invalid" if status == TaskStatus.INVALIDATED else "validated"
)
task_link = MessageService.get_task_link(project_id, task_id)
replace_list = [
["[USERNAME]", user.username],
["[TASK_LINK]", task_link],
["[ORG_NAME]", current_app.config["ORG_NAME"]],
]
text_template = template_var_replacing(text_template, replace_list)
messages = []
validation_message = Message()
validation_message.message_type = (
MessageType.INVALIDATION_NOTIFICATION.value
if status == TaskStatus.INVALIDATED
else MessageType.VALIDATION_NOTIFICATION.value
)
validation_message.project_id = project_id
validation_message.task_id = task_id
validation_message.from_user_id = validated_by
validation_message.to_user_id = mapped_by
validation_message.subject = (
f"{task_link} mapped by you in Project {project_id} has been {status_text}"
)
validation_message.message = text_template
messages.append(dict(message=validation_message, user=user))
# For email alerts
MessageService._push_messages(messages)
@staticmethod
def send_message_to_all_contributors(project_id: int, message_dto: MessageDTO):
"""Sends supplied message to all contributors on specified project. Message all contributors can take
over a minute to run, so this method is expected to be called on its own thread"""
app = (
create_app()
) # Because message-all run on background thread it needs it's own app context
with app.app_context():
contributors = Message.get_all_contributors(project_id)
message_dto.message = "A message from {} managers:<br/><br/>{}".format(
MessageService.get_project_link(project_id),
markdown(message_dto.message, output_format="html"),
)
messages = []
for contributor in contributors:
message = Message.from_dto(contributor[0], message_dto)
message.message_type = MessageType.BROADCAST.value
message.project_id = project_id
user = UserService.get_user_by_id(contributor[0])
messages.append(dict(message=message, user=user))
MessageService._push_messages(messages)
@staticmethod
def _push_messages(messages):
if len(messages) == 0:
return
messages_objs = []
for i, message in enumerate(messages):
user = message.get("user")
obj = message.get("message")
# Store message in the database only if mentions option are disabled.
if (
user.mentions_notifications is False
and obj.message_type == MessageType.MENTION_NOTIFICATION.value
):
messages_objs.append(obj)
continue
if (
user.projects_notifications is False
and obj.message_type == MessageType.PROJECT_ACTIVITY_NOTIFICATION.value
):
continue
if (
user.projects_notifications is False
and obj.message_type == MessageType.BROADCAST.value
):
continue
if (
user.teams_notifications is False
and obj.message_type == MessageType.TEAM_BROADCAST.value
):
messages_objs.append(obj)
continue
if user.comments_notifications is False and obj.message_type in (
MessageType.TASK_COMMENT_NOTIFICATION.value,
MessageType.PROJECT_CHAT_NOTIFICATION.value,
):
continue
if user.tasks_notifications is False and obj.message_type in (
MessageType.VALIDATION_NOTIFICATION.value,
MessageType.INVALIDATION_NOTIFICATION.value,
):
messages_objs.append(obj)
continue
messages_objs.append(obj)
SMTPService.send_email_alert(
user.email_address,
user.username,
user.is_email_verified,
message["message"].id,
UserService.get_user_by_id(message["message"].from_user_id).username,
message["message"].project_id,
message["message"].task_id,
clean_html(message["message"].subject),
message["message"].message,
obj.message_type,
)
if i + 1 % 10 == 0:
time.sleep(0.5)
# Flush messages to the database.
if len(messages_objs) > 0:
db.session.add_all(messages_objs)
db.session.flush()
db.session.commit()
@staticmethod
def send_message_after_comment(
comment_from: int, comment: str, task_id: int, project_id: int
):
""" Will send a canned message to anyone @'d in a comment """
usernames = MessageService._parse_message_for_username(comment, project_id)
if len(usernames) != 0:
task_link = MessageService.get_task_link(project_id, task_id)
messages = []
for username in usernames:
try:
user = UserService.get_user_by_username(username)
except NotFound:
continue # If we can't find the user, keep going no need to fail
message = Message()
message.message_type = MessageType.MENTION_NOTIFICATION.value
message.project_id = project_id
message.task_id = task_id
message.from_user_id = comment_from
message.to_user_id = user.id
message.subject = f"You were mentioned in a comment in {task_link} of Project {project_id}"
message.message = comment
messages.append(dict(message=message, user=user))
MessageService._push_messages(messages)
# Notify all contributors except the user that created the comment.
results = (
TaskHistory.query.with_entities(TaskHistory.user_id.distinct())
.filter(TaskHistory.project_id == project_id)
.filter(TaskHistory.task_id == task_id)
.filter(TaskHistory.user_id != comment_from)
.filter(TaskHistory.action == TaskAction.STATE_CHANGE.name)
.all()
)
contributed_users = [r[0] for r in results]
if len(contributed_users) != 0:
user_from = User.query.get(comment_from)
if user_from is None:
raise ValueError("Username not found")
user_link = MessageService.get_user_link(user_from.username)
task_link = MessageService.get_task_link(project_id, task_id)
messages = []
for user_id in contributed_users:
try:
user = UserService.get_user_by_id(user_id)
# if user was mentioned, a message has already been sent to them,
# so we can skip
if user.username in usernames:
break
except NotFound:
continue # If we can't find the user, keep going no need to fail
message = Message()
message.message_type = MessageType.TASK_COMMENT_NOTIFICATION.value
message.project_id = project_id
message.from_user_id = comment_from
message.task_id = task_id
message.to_user_id = user.id
message.subject = (
f"{user_link} left a comment in {task_link} of Project {project_id}"
)
message.message = comment
messages.append(dict(message=message, user=user))
MessageService._push_messages(messages)
@staticmethod
def get_user_link(username: str):
base_url = current_app.config["APP_BASE_URL"]
return f'<a href="{base_url}/users/{username}">{username}</a>'
@staticmethod
def get_team_link(team_name: str, team_id: int, management: bool):
base_url = current_app.config["APP_BASE_URL"]
if management is True:
return f'<a href="{base_url}/manage/teams/{team_id}/">{team_name}</a>'
else:
return f'<a href="{base_url}/teams/{team_id}/membership/">{team_name}</a>'
@staticmethod
def send_request_to_join_team(
from_user: int, from_username: str, to_user: int, team_name: str, team_id: int
):
message = Message()
message.message_type = MessageType.REQUEST_TEAM_NOTIFICATION.value
message.from_user_id = from_user
message.to_user_id = to_user
message.subject = "{} requested to join {}".format(
MessageService.get_user_link(from_username),
MessageService.get_team_link(team_name, team_id, True),
)
message.message = "{} has requested to join the {} team.\
Access the team management page to accept or reject that request.".format(
MessageService.get_user_link(from_username),
MessageService.get_team_link(team_name, team_id, True),
)
message.add_message()
message.save()
@staticmethod
def accept_reject_request_to_join_team(
from_user: int,
from_username: str,
to_user: int,
team_name: str,
team_id: int,
response: str,
):
message = Message()
message.message_type = MessageType.REQUEST_TEAM_NOTIFICATION.value
message.from_user_id = from_user
message.to_user_id = to_user
message.subject = "Request to join {} was {}ed".format(
MessageService.get_team_link(team_name, team_id, False), response
)
message.message = "{} has {}ed your request to join the {} team.".format(
MessageService.get_user_link(from_username),
response,
MessageService.get_team_link(team_name, team_id, False),
)
message.add_message()
message.save()
@staticmethod
def accept_reject_invitation_request_for_team(
from_user: int,
from_username: str,
to_user: int,
sending_member: str,
team_name: str,
team_id: int,
response: str,
):
message = Message()
message.message_type = MessageType.INVITATION_NOTIFICATION.value
message.from_user_id = from_user
message.to_user_id = to_user
message.subject = "{} {}ed to join {}".format(
MessageService.get_user_link(from_username),
response,
MessageService.get_team_link(team_name, team_id, True),
)
message.message = "{} has {}ed {}'s invitation to join the {} team.".format(
MessageService.get_user_link(from_username),
response,
sending_member,
MessageService.get_team_link(team_name, team_id, True),
)
message.add_message()
message.save()
@staticmethod
def send_invite_to_join_team(
from_user: int, from_username: str, to_user: int, team_name: str, team_id: int
):
message = Message()
message.message_type = MessageType.INVITATION_NOTIFICATION.value
message.from_user_id = from_user
message.to_user_id = to_user
message.subject = "Invitation to join {}".format(
MessageService.get_team_link(team_name, team_id, False)
)
message.message = "{} has invited you to join the {} team.\
Access the {}'s page to accept or reject that invitation.".format(
MessageService.get_user_link(from_username),
MessageService.get_team_link(team_name, team_id, False),
MessageService.get_team_link(team_name, team_id, False),
)
message.add_message()
message.save()
@staticmethod
def send_message_after_chat(chat_from: int, chat: str, project_id: int):
""" Send alert to user if they were @'d in a chat message """
# Because message-all run on background thread it needs it's own app context
app = create_app()
with app.app_context():
usernames = MessageService._parse_message_for_username(chat, project_id)
if len(usernames) != 0:
link = MessageService.get_project_link(
project_id, include_chat_section=True
)
messages = []
for username in usernames:
current_app.logger.debug(f"Searching for {username}")
try:
user = UserService.get_user_by_username(username)
except NotFound:
current_app.logger.error(f"Username {username} not found")
continue # If we can't find the user, keep going no need to fail
message = Message()
message.message_type = MessageType.MENTION_NOTIFICATION.value
message.project_id = project_id
message.from_user_id = chat_from
message.to_user_id = user.id
message.subject = f"You were mentioned in {link} chat"
message.message = chat
messages.append(dict(message=message, user=user))
MessageService._push_messages(messages)
query = """ select user_id from project_favorites where project_id = :project_id"""
favorited_users_results = db.engine.execute(
text(query), project_id=project_id
)
favorited_users = [r[0] for r in favorited_users_results]
# Notify all contributors except the user that created the comment.
contributed_users_results = (
TaskHistory.query.with_entities(TaskHistory.user_id.distinct())
.filter(TaskHistory.project_id == project_id)
.filter(TaskHistory.user_id != chat_from)
.filter(TaskHistory.action == TaskAction.STATE_CHANGE.name)
.all()
)
contributed_users = [r[0] for r in contributed_users_results]
users_to_notify = list(set(contributed_users + favorited_users))
if len(users_to_notify) != 0:
from_user = User.query.get(chat_from)
from_user_link = MessageService.get_user_link(from_user.username)
project_link = MessageService.get_project_link(
project_id, include_chat_section=True
)
messages = []
for user_id in users_to_notify:
try:
user = UserService.get_user_by_id(user_id)
except NotFound:
continue # If we can't find the user, keep going no need to fail
message = Message()
message.message_type = MessageType.PROJECT_CHAT_NOTIFICATION.value
message.project_id = project_id
message.from_user_id = chat_from
message.to_user_id = user.id
message.subject = (
f"{from_user_link} left a comment in {project_link}"
)
message.message = chat
messages.append(dict(message=message, user=user))
# it's important to keep that line inside the if to avoid duplicated emails
MessageService._push_messages(messages)
@staticmethod
def send_favorite_project_activities(user_id: int):
current_app.logger.debug("Sending Favorite Project Activities")
favorited_projects = UserService.get_projects_favorited(user_id)
contributed_projects = UserService.get_projects_mapped(user_id)
if contributed_projects is None:
contributed_projects = []
for favorited_project in favorited_projects.favorited_projects:
contributed_projects.append(favorited_project.project_id)
recently_updated_projects = (
Project.query.with_entities(
Project.id, func.DATE(Project.last_updated).label("last_updated")
)
.filter(Project.id.in_(contributed_projects))
.filter(
func.DATE(Project.last_updated)
> datetime.date.today() - datetime.timedelta(days=300)
)
)
user = UserService.get_user_by_id(user_id)
messages = []
for project in recently_updated_projects:
activity_message = []
query_last_active_users = """ select distinct(user_id) from
(select user_id from task_history where project_id = :project_id
order by action_date desc limit 15 ) t """
last_active_users = db.engine.execute(
text(query_last_active_users), project_id=project.id
)
for recent_user_id in last_active_users:
recent_user_details = UserService.get_user_by_id(recent_user_id)
user_profile_link = MessageService.get_user_profile_link(
recent_user_details.username
)
activity_message.append(user_profile_link)
activity_message = str(activity_message)[1:-1]
project_link = MessageService.get_project_link(project.id)
message = Message()
message.message_type = MessageType.PROJECT_ACTIVITY_NOTIFICATION.value
message.project_id = project.id
message.to_user_id = user.id
message.subject = (
"Recent activities from your contributed/favorited Projects"
)
message.message = (
f"{activity_message} contributed to {project_link} recently"
)
messages.append(dict(message=message, user=user))
MessageService._push_messages(messages)
@staticmethod
def resend_email_validation(user_id: int):
""" Resends the email validation email to the logged in user """
user = UserService.get_user_by_id(user_id)
SMTPService.send_verification_email(user.email_address, user.username)
@staticmethod
def _get_managers(message: str, project_id: int) -> List[str]:
parser = re.compile(r"((?<=#)\w+|\[.+?\])")
parsed = parser.findall(message)
project = None
if "author" in parsed or "managers" in parsed:
project = Project.query.get(project_id)
if project is None:
return []
project_managers = [project.author.username]
if "managers" not in parsed:
return project_managers
teams = [t for t in project.teams if t.role == TeamRoles.PROJECT_MANAGER.value]
team_members = [
[u.member.username for u in t.team.members if u.active is True]
for t in teams
]
team_members = [item for sublist in team_members for item in sublist]
project_managers.extend(team_members)
return project_managers
@staticmethod
def _parse_message_for_username(message: str, project_id: int) -> List[str]:
""" Extracts all usernames from a comment looks for format @[user name] """
parser = re.compile(r"((?<=@)\w+|\[.+?\])")
usernames = []
for username in parser.findall(message):
username = username.replace("[", "", 1)
index = username.rfind("]")
username = username.replace("]", "", index)
usernames.append(username)
usernames.extend(MessageService._get_managers(message, project_id))
usernames = list(set(usernames))
return usernames
@staticmethod
@cached(message_cache)
def has_user_new_messages(user_id: int) -> dict:
""" Determines if the user has any unread messages """
count = Notification.get_unread_message_count(user_id)
new_messages = False
if count > 0:
new_messages = True
return dict(newMessages=new_messages, unread=count)
@staticmethod
def get_all_messages(
user_id: int,
locale: str,
page: int,
page_size=10,
sort_by=None,
sort_direction=None,
message_type=None,
from_username=None,
project=None,
task_id=None,
status=None,
):
""" Get all messages for user """
sort_column = Message.__table__.columns.get(sort_by)
if sort_column is None:
sort_column = Message.date
sort_column = (
sort_column.asc() if sort_direction.lower() == "asc" else sort_column.desc()
)
query = Message.query
if project is not None:
query = query.filter(Message.project_id == project)
if task_id is not None:
query = query.filter(Message.task_id == task_id)
if status in ["read", "unread"]:
query = query.filter(Message.read == (True if status == "read" else False))
if message_type:
message_type_filters = map(int, message_type.split(","))
query = query.filter(Message.message_type.in_(message_type_filters))
if from_username is not None:
query = query.join(Message.from_user).filter(
User.username.ilike(from_username + "%")
)
results = (
query.filter(Message.to_user_id == user_id)
.order_by(sort_column)
.paginate(page, page_size, True)
)
# if results.total == 0:
# raise NotFound()
messages_dto = MessagesDTO()
for item in results.items:
if isinstance(item, tuple):
message_dto = item[0].as_dto()
message_dto.project_title = item[1].name
else:
message_dto = item.as_dto()
if item.project_id is not None:
message_dto.project_title = item.project.get_project_title(locale)
messages_dto.user_messages.append(message_dto)
messages_dto.pagination = Pagination(results)
return messages_dto
@staticmethod
def get_message(message_id: int, user_id: int) -> Message:
""" Gets the specified message """
message = Message.query.get(message_id)
if message is None:
raise NotFound()
if message.to_user_id != int(user_id):
raise MessageServiceError(
f"User {user_id} attempting to access another users message {message_id}"
)
return message
@staticmethod
def get_message_as_dto(message_id: int, user_id: int):
""" Gets the selected message and marks it as read """
message = MessageService.get_message(message_id, user_id)
message.mark_as_read()
return message.as_dto()
@staticmethod
def delete_message(message_id: int, user_id: int):
""" Deletes the specified message """
message = MessageService.get_message(message_id, user_id)
message.delete()
@staticmethod
def delete_multiple_messages(message_ids: list, user_id: int):
""" Deletes the specified messages to the user """
Message.delete_multiple_messages(message_ids, user_id)
@staticmethod
def get_task_link(project_id: int, task_id: int, base_url=None) -> str:
""" Helper method that generates a link to the task """
if not base_url:
base_url = current_app.config["APP_BASE_URL"]
return f'<a href="{base_url}/projects/{project_id}/tasks/?search={task_id}">Task {task_id}</a>'
@staticmethod
def get_project_link(
project_id: int, base_url=None, include_chat_section=False
) -> str:
""" Helper method to generate a link to project chat"""
if not base_url:
base_url = current_app.config["APP_BASE_URL"]
if include_chat_section:
section = "#questionsAndComments"
else:
section = ""
return f'<a href="{base_url}/projects/{project_id}{section}">Project {project_id}</a>'
@staticmethod
def get_user_profile_link(user_name: str, base_url=None) -> str:
""" Helper method to generate a link to a user profile"""
if not base_url:
base_url = current_app.config["APP_BASE_URL"]
return f'<a href="{base_url}/users/{user_name}">{user_name}</a>'
@staticmethod
def get_user_settings_link(section=None, base_url=None) -> str:
""" Helper method to generate a link to a user profile"""
if not base_url:
base_url = current_app.config["APP_BASE_URL"]
return f'<a href="{base_url}/settings#{section}">User Settings</a>'