backend/services/team_service.py (450 lines of code) (raw):
from flask import current_app
from sqlalchemy import and_
from markdown import markdown
from backend import create_app, db
from backend.models.dtos.team_dto import (
TeamDTO,
NewTeamDTO,
TeamsListDTO,
ProjectTeamDTO,
TeamDetailsDTO,
)
from backend.models.dtos.message_dto import MessageDTO
from backend.models.postgis.message import Message, MessageType
from backend.models.postgis.team import Team, TeamMembers
from backend.models.postgis.project import ProjectTeams
from backend.models.postgis.project_info import ProjectInfo
from backend.models.postgis.utils import NotFound
from backend.models.postgis.statuses import (
TeamMemberFunctions,
TeamVisibility,
TeamRoles,
)
from backend.services.organisation_service import OrganisationService
from backend.services.users.user_service import UserService
from backend.services.messaging.message_service import MessageService
class TeamServiceError(Exception):
""" Custom Exception to notify callers an error occurred when handling teams """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class TeamJoinNotAllowed(Exception):
""" Custom Exception to notify bad user level on joining team """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class TeamService:
@staticmethod
def join_team(team_id: int, requesting_user: int, username: str, role: str = None):
is_manager = TeamService.is_user_team_manager(team_id, requesting_user)
team = TeamService.get_team_by_id(team_id)
user = UserService.get_user_by_username(username)
if TeamService.is_user_team_member(team.id, user.id):
raise TeamJoinNotAllowed(
"User is already a member of this team or has already requested to join"
)
if is_manager:
if role:
try:
role = TeamMemberFunctions[role.upper()].value
except KeyError:
raise Exception("Invalid TeamMemberFunction")
else:
role = TeamMemberFunctions.MEMBER.value
TeamService.add_team_member(team_id, user.id, role, True)
else:
if user.id != requesting_user:
raise TeamJoinNotAllowed("User not allowed to join team")
role = TeamMemberFunctions.MEMBER.value
# active if the team is open
if team.invite_only:
active = False
else:
active = True
TeamService.add_team_member(team_id, user.id, role, active)
if team.invite_only:
team_managers = team.get_team_managers()
for member in team_managers:
MessageService.send_request_to_join_team(
user.id, user.username, member.user_id, team.name, team_id
)
@staticmethod
def send_invite(team_id, from_user_id, username):
to_user = UserService.get_user_by_username(username)
from_user = UserService.get_user_by_id(from_user_id)
team = TeamService.get_team_by_id(team_id)
MessageService.send_invite_to_join_team(
from_user_id, from_user.username, to_user.id, team.name, team_id
)
@staticmethod
def accept_reject_join_request(team_id, from_user_id, username, function, action):
from_user = UserService.get_user_by_id(from_user_id)
to_user_id = UserService.get_user_by_username(username).id
team = TeamService.get_team_by_id(team_id)
MessageService.accept_reject_request_to_join_team(
from_user_id, from_user.username, to_user_id, team.name, team_id, action
)
is_member = TeamService.is_user_team_member(team_id, to_user_id)
if action == "accept":
if is_member:
TeamService.activate_team_member(team_id, to_user_id)
else:
TeamService.add_team_member(
team_id,
to_user_id,
TeamMemberFunctions[function.upper()].value,
True,
)
elif action == "reject":
if is_member:
TeamService.delete_invite(team_id, to_user_id)
else:
raise TeamServiceError("Invalid action type")
@staticmethod
def accept_reject_invitation_request(
team_id, from_user_id, username, function, action
):
from_user = UserService.get_user_by_id(from_user_id)
to_user = UserService.get_user_by_username(username)
team = TeamService.get_team_by_id(team_id)
team_members = team.get_team_managers()
for member in team_members:
MessageService.accept_reject_invitation_request_for_team(
from_user_id,
from_user.username,
member.user_id,
to_user.username,
team.name,
team_id,
action,
)
if action == "accept":
TeamService.add_team_member(
team_id, from_user_id, TeamMemberFunctions[function.upper()].value
)
@staticmethod
def add_team_member(team_id, user_id, function, active=False):
team_member = TeamMembers()
team_member.team_id = team_id
team_member.user_id = user_id
team_member.function = function
team_member.active = active
team_member.create()
@staticmethod
def leave_team(team_id, username):
user = UserService.get_user_by_username(username)
team_member = TeamMembers.query.filter(
TeamMembers.team_id == team_id, TeamMembers.user_id == user.id
).one()
team_member.delete()
@staticmethod
def add_team_project(team_id, project_id, role):
team_project = ProjectTeams()
team_project.project_id = project_id
team_project.team_id = team_id
team_project.role = TeamRoles[role].value
team_project.create()
@staticmethod
def delete_team_project(team_id, project_id):
project = ProjectTeams.query.filter(
and_(ProjectTeams.team_id == team_id, ProjectTeams.project_id == project_id)
).one()
project.delete()
@staticmethod
def get_all_teams(
user_id: int = None,
team_name_filter: str = None,
team_role_filter: str = None,
member_filter: int = None,
member_request_filter: int = None,
manager_filter: int = None,
organisation_filter: int = None,
omit_members: bool = False,
) -> TeamsListDTO:
query = db.session.query(Team)
orgs_query = None
is_admin = UserService.is_user_an_admin(user_id)
if organisation_filter:
orgs_query = query.filter(Team.organisation_id == organisation_filter)
if manager_filter and not (manager_filter == user_id and is_admin):
manager_teams = query.filter(
TeamMembers.user_id == manager_filter,
TeamMembers.active == True, # noqa
TeamMembers.function == TeamMemberFunctions.MANAGER.value,
Team.id == TeamMembers.team_id,
)
manager_orgs_teams = query.filter(
Team.organisation_id.in_(
[
org.id
for org in OrganisationService.get_organisations(manager_filter)
]
)
)
query = manager_teams.union(manager_orgs_teams)
if team_name_filter:
query = query.filter(
Team.name.ilike("%" + team_name_filter + "%"),
)
if team_role_filter:
try:
role = TeamRoles[team_role_filter.upper()].value
project_teams = (
db.session.query(ProjectTeams)
.filter(ProjectTeams.role == role)
.subquery()
)
query = query.join(project_teams)
except KeyError:
pass
if member_filter:
team_member = (
db.session.query(TeamMembers)
.filter(
TeamMembers.user_id == member_filter, TeamMembers.active.is_(True)
)
.subquery()
)
query = query.join(team_member)
if member_request_filter:
team_member = (
db.session.query(TeamMembers)
.filter(
TeamMembers.user_id == member_request_filter,
TeamMembers.active.is_(False),
)
.subquery()
)
query = query.join(team_member)
if orgs_query:
query = query.union(orgs_query)
teams_list_dto = TeamsListDTO()
for team in query.all():
team_dto = TeamDTO()
team_dto.team_id = team.id
team_dto.name = team.name
team_dto.invite_only = team.invite_only
team_dto.visibility = TeamVisibility(team.visibility).name
team_dto.description = team.description
team_dto.logo = team.organisation.logo
team_dto.organisation = team.organisation.name
team_dto.organisation_id = team.organisation.id
team_dto.members = []
is_team_member = TeamService.is_user_an_active_team_member(team.id, user_id)
# Skip if members are not included
if not omit_members:
team_members = team.members
team_dto.members = [
team.as_dto_team_member(member) for member in team_members
]
if team_dto.visibility == "PRIVATE" and not is_admin:
if is_team_member:
teams_list_dto.teams.append(team_dto)
else:
teams_list_dto.teams.append(team_dto)
return teams_list_dto
@staticmethod
def get_team_as_dto(
team_id: int, user_id: int, abbreviated: bool
) -> TeamDetailsDTO:
team = TeamService.get_team_by_id(team_id)
if team is None:
raise NotFound()
team_dto = TeamDetailsDTO()
team_dto.team_id = team.id
team_dto.name = team.name
team_dto.invite_only = team.invite_only
team_dto.visibility = TeamVisibility(team.visibility).name
team_dto.description = team.description
team_dto.logo = team.organisation.logo
team_dto.organisation = team.organisation.name
team_dto.organisation_id = team.organisation.id
team_dto.organisation_slug = team.organisation.slug
if user_id != 0:
if UserService.is_user_an_admin(user_id):
team_dto.is_general_admin = True
if OrganisationService.is_user_an_org_manager(
team.organisation.id, user_id
):
team_dto.is_org_admin = True
else:
team_dto.is_general_admin = False
team_dto.is_org_admin = False
if abbreviated:
return team_dto
team_dto.members = [team.as_dto_team_member(member) for member in team.members]
team_projects = TeamService.get_projects_by_team_id(team.id)
team_dto.team_projects = [
team.as_dto_team_project(project) for project in team_projects
]
return team_dto
@staticmethod
def get_projects_by_team_id(team_id: int):
projects = (
db.session.query(
ProjectInfo.name, ProjectTeams.project_id, ProjectTeams.role
)
.join(ProjectTeams, ProjectInfo.project_id == ProjectTeams.project_id)
.filter(ProjectTeams.team_id == team_id)
.all()
)
if projects is None:
raise NotFound()
return projects
@staticmethod
def get_project_teams_as_dto(project_id: int) -> TeamsListDTO:
""" Gets all the teams for a specified project """
project_teams = ProjectTeams.query.filter(
ProjectTeams.project_id == project_id
).all()
teams_list_dto = TeamsListDTO()
for project_team in project_teams:
team = TeamService.get_team_by_id(project_team.team_id)
team_dto = ProjectTeamDTO()
team_dto.team_id = project_team.team_id
team_dto.team_name = team.name
team_dto.role = project_team.role
teams_list_dto.teams.append(team_dto)
return teams_list_dto
@staticmethod
def change_team_role(team_id: int, project_id: int, role: str):
project = ProjectTeams.query.filter(
and_(ProjectTeams.team_id == team_id, ProjectTeams.project_id == project_id)
).one()
project.role = TeamRoles[role].value
project.save()
@staticmethod
def get_team_by_id(team_id: int) -> Team:
"""
Get team from DB
:param team_id: ID of team to fetch
:returns: Team
:raises: Not Found
"""
team = Team.get(team_id)
if team is None:
raise NotFound()
return team
@staticmethod
def get_team_by_name(team_name: str) -> Team:
team = Team.get_team_by_name(team_name)
if team is None:
raise NotFound()
return team
@staticmethod
def create_team(new_team_dto: NewTeamDTO) -> int:
"""
Creates a new team using a team dto
:param new_team_dto: Team DTO
:returns: ID of new Team
"""
TeamService.assert_validate_organisation(new_team_dto.organisation_id)
team = Team.create_from_dto(new_team_dto)
return team.id
@staticmethod
def update_team(team_dto: TeamDTO) -> Team:
"""
Updates a team
:param team_dto: DTO with updated info
:returns updated Team
"""
team = TeamService.get_team_by_id(team_dto.team_id)
team.update(team_dto)
return team
@staticmethod
def assert_validate_organisation(org_id: int):
""" Makes sure an organisation exists """
try:
OrganisationService.get_organisation_by_id(org_id)
except NotFound:
raise TeamServiceError(f"Organisation {org_id} does not exist")
@staticmethod
def assert_validate_members(team_dto: TeamDTO):
""" Validates that the users exist"""
if len(team_dto.members) == 0:
raise TeamServiceError("Must have at least one member")
members = []
managers = 0
for member in team_dto.members:
try:
UserService.get_user_by_username(member["name"])
except NotFound:
raise NotFound(f'User {member["name"]} does not exist')
if member["function"] == TeamMemberFunctions.MANAGER.name:
managers += 1
members.append(member)
if managers == 0:
raise TeamServiceError("Must have at least one manager in team")
team_dto.members = members
@staticmethod
def _get_team_members(team_id: int):
return TeamMembers.query.filter_by(team_id=team_id).all()
@staticmethod
def _get_active_team_members(team_id: int):
return TeamMembers.query.filter_by(team_id=team_id, active=True).all()
@staticmethod
def activate_team_member(team_id: int, user_id: int):
member = TeamMembers.query.filter(
TeamMembers.team_id == team_id, TeamMembers.user_id == user_id
).first()
member.active = True
db.session.add(member)
db.session.commit()
@staticmethod
def delete_invite(team_id: int, user_id: int):
member = TeamMembers.query.filter(
TeamMembers.team_id == team_id, TeamMembers.user_id == user_id
).first()
member.delete()
@staticmethod
def is_user_team_member(team_id: int, user_id: int):
query = TeamMembers.query.filter(
TeamMembers.team_id == team_id,
TeamMembers.user_id == user_id,
).exists()
return db.session.query(query).scalar()
@staticmethod
def is_user_an_active_team_member(team_id: int, user_id: int):
query = TeamMembers.query.filter(
TeamMembers.team_id == team_id,
TeamMembers.user_id == user_id,
TeamMembers.active.is_(True),
).exists()
return db.session.query(query).scalar()
@staticmethod
def is_user_team_manager(team_id: int, user_id: int):
# Admin manages all teams
team = Team.get(team_id)
if UserService.is_user_an_admin(user_id):
return True
managers = team.get_team_managers()
for member in managers:
if member.user_id == user_id:
return True
# Org admin manages teams attached to their org
user_managed_orgs = [
org.id for org in OrganisationService.get_organisations(user_id)
]
if team.organisation_id in user_managed_orgs:
return True
return False
@staticmethod
def delete_team(team_id: int):
""" Deletes a team """
team = TeamService.get_team_by_id(team_id)
if team.can_be_deleted():
team.delete()
else:
raise TeamServiceError("Team has projects, cannot be deleted")
@staticmethod
def check_team_membership(project_id: int, allowed_roles: list, user_id: int):
""" Given a project and permitted team roles, check user's membership in the team list """
teams_dto = TeamService.get_project_teams_as_dto(project_id)
teams_allowed = [
team_dto for team_dto in teams_dto.teams if team_dto.role in allowed_roles
]
user_membership = [
team_dto.team_id
for team_dto in teams_allowed
if TeamService.is_user_an_active_team_member(team_dto.team_id, user_id)
]
return len(user_membership) > 0
@staticmethod
def send_message_to_all_team_members(
team_id: int, team_name: str, message_dto: MessageDTO
):
"""Sends supplied message to all contributors in a team. Message all team members can take
over a minute to run, so this method is expected to be called on its own thread"""
app = (
create_app()
) # Because message-all run on background thread it needs it's own app context
with app.app_context():
team_members = TeamService._get_active_team_members(team_id)
sender = UserService.get_user_by_id(message_dto.from_user_id).username
message_dto.message = (
"A message from {}, manager of {} team:<br/><br/>{}".format(
MessageService.get_user_profile_link(sender),
MessageService.get_team_link(team_name, team_id, False),
markdown(message_dto.message, output_format="html"),
)
)
messages = []
for team_member in team_members:
if team_member.user_id != message_dto.from_user_id:
message = Message.from_dto(team_member.user_id, message_dto)
message.message_type = MessageType.TEAM_BROADCAST.value
message.save()
user = UserService.get_user_by_id(team_member.user_id)
messages.append(dict(message=message, user=user))
MessageService._push_messages(messages)