backend/models/postgis/team.py (165 lines of code) (raw):
from backend import db
from backend.models.dtos.team_dto import (
TeamDTO,
NewTeamDTO,
TeamMembersDTO,
TeamProjectDTO,
)
from backend.models.dtos.organisation_dto import OrganisationTeamsDTO
from backend.models.postgis.organisation import Organisation
from backend.models.postgis.statuses import (
TeamVisibility,
TeamMemberFunctions,
TeamRoles,
)
from backend.models.postgis.user import User
from backend.models.postgis.utils import NotFound
class TeamMembers(db.Model):
__tablename__ = "team_members"
team_id = db.Column(
db.Integer, db.ForeignKey("teams.id", name="fk_teams"), primary_key=True
)
user_id = db.Column(
db.BigInteger, db.ForeignKey("users.id", name="fk_users"), primary_key=True
)
function = db.Column(db.Integer, nullable=False) # either 'editor' or 'manager'
active = db.Column(db.Boolean, default=False)
member = db.relationship(
User, backref=db.backref("teams", cascade="all, delete-orphan")
)
team = db.relationship(
"Team", backref=db.backref("members", cascade="all, delete-orphan")
)
def create(self):
""" Creates and saves the current model to the DB """
db.session.add(self)
db.session.commit()
def delete(self):
""" Deletes the current model from the DB """
db.session.delete(self)
db.session.commit()
class Team(db.Model):
""" Describes a team """
__tablename__ = "teams"
# Columns
id = db.Column(db.Integer, primary_key=True)
organisation_id = db.Column(
db.Integer,
db.ForeignKey("organisations.id", name="fk_organisations"),
nullable=False,
)
name = db.Column(db.String(512), nullable=False)
logo = db.Column(db.String) # URL of a logo
description = db.Column(db.String)
invite_only = db.Column(db.Boolean, default=False, nullable=False)
visibility = db.Column(
db.Integer, default=TeamVisibility.PUBLIC.value, nullable=False
)
organisation = db.relationship(Organisation, backref="teams")
def create(self):
""" Creates and saves the current model to the DB """
db.session.add(self)
db.session.commit()
@classmethod
def create_from_dto(cls, new_team_dto: NewTeamDTO):
""" Creates a new team from a dto """
new_team = cls()
new_team.name = new_team_dto.name
new_team.description = new_team_dto.description
new_team.invite_only = new_team_dto.invite_only
new_team.visibility = TeamVisibility[new_team_dto.visibility].value
org = Organisation.get(new_team_dto.organisation_id)
new_team.organisation = org
# Create team member with creator as a manager
new_member = TeamMembers()
new_member.team = new_team
new_member.user_id = new_team_dto.creator
new_member.function = TeamMemberFunctions.MANAGER.value
new_member.active = True
new_team.members.append(new_member)
new_team.create()
return new_team
def update(self, team_dto: TeamDTO):
""" Updates Team from DTO """
if team_dto.organisation:
self.organisation = Organisation().get_organisation_by_name(
team_dto.organisation
)
for attr, value in team_dto.items():
if attr == "visibility" and value is not None:
value = TeamVisibility[team_dto.visibility].value
if attr in ("members", "organisation"):
continue
try:
is_field_nullable = self.__table__.columns[attr].nullable
if is_field_nullable and value is not None:
setattr(self, attr, value)
elif value is not None:
setattr(self, attr, value)
except KeyError:
continue
if team_dto.members != self._get_team_members() and team_dto.members:
for member in self.members:
db.session.delete(member)
for member in team_dto.members:
user = User.get_by_username(member["userName"])
if user is None:
raise NotFound("User not found")
new_team_member = TeamMembers()
new_team_member.team = self
new_team_member.member = user
new_team_member.function = TeamMemberFunctions[member["function"]].value
db.session.commit()
def delete(self):
""" Deletes the current model from the DB """
db.session.delete(self)
db.session.commit()
def can_be_deleted(self) -> bool:
""" A Team can be deleted if it doesn't have any projects """
return len(self.projects) == 0
def get(team_id: int):
"""
Gets specified team by id
:param team_id: team ID in scope
:return: Team if found otherwise None
"""
return Team.query.get(team_id)
def get_team_by_name(team_name: str):
"""
Gets specified team by name
:param team_name: team name in scope
:return: Team if found otherwise None
"""
return Team.query.filter_by(name=team_name).one_or_none()
def as_dto(self):
""" Returns a dto for the team """
team_dto = TeamDTO()
team_dto.team_id = self.id
team_dto.description = self.description
team_dto.invite_only = self.invite_only
team_dto.members = self._get_team_members()
team_dto.name = self.name
team_dto.organisation = self.organisation.name
team_dto.organisation_id = self.organisation.id
team_dto.logo = self.organisation.logo
team_dto.visibility = TeamVisibility(self.visibility).name
return team_dto
def as_dto_inside_org(self):
""" Returns a dto for the team """
team_dto = OrganisationTeamsDTO()
team_dto.team_id = self.id
team_dto.name = self.name
team_dto.description = self.description
team_dto.invite_only = self.invite_only
team_dto.members = self._get_team_members()
team_dto.visibility = TeamVisibility(self.visibility).name
return team_dto
def as_dto_team_member(self, member) -> TeamMembersDTO:
""" Returns a dto for the team member"""
member_dto = TeamMembersDTO()
user = User.get_by_id(member.user_id)
member_function = TeamMemberFunctions(member.function).name
member_dto.username = user.username
member_dto.function = member_function
member_dto.picture_url = user.picture_url
member_dto.active = member.active
return member_dto
def as_dto_team_project(self, project) -> TeamProjectDTO:
""" Returns a dto for the team project """
project_team_dto = TeamProjectDTO()
project_team_dto.project_name = project.name
project_team_dto.project_id = project.project_id
project_team_dto.role = TeamRoles(project.role).name
return project_team_dto
def _get_team_members(self):
""" Helper to get JSON serialized members """
members = []
for mem in self.members:
members.append(
{
"username": mem.member.username,
"pictureUrl": mem.member.picture_url,
"function": TeamMemberFunctions(mem.function).name,
"active": mem.active,
}
)
return members
def get_team_managers(self):
return TeamMembers.query.filter_by(
team_id=self.id, function=TeamMemberFunctions.MANAGER.value, active=True
).all()