backend/services/project_service.py (433 lines of code) (raw):
from cachetools import TTLCache, cached
from flask import current_app
from backend.models.dtos.mapping_dto import TaskDTOs
from backend.models.dtos.project_dto import (
ProjectDTO,
ProjectSummary,
ProjectStatsDTO,
ProjectUserStatsDTO,
ProjectContribsDTO,
ProjectContribDTO,
ProjectSearchResultsDTO,
)
from backend.models.postgis.organisation import Organisation
from backend.models.postgis.project import Project, ProjectStatus, MappingLevel
from backend.models.postgis.statuses import (
MappingNotAllowed,
ValidatingNotAllowed,
MappingPermission,
ValidationPermission,
TeamRoles,
)
from backend.models.postgis.task import Task, TaskHistory
from backend.models.postgis.utils import NotFound
from backend.services.users.user_service import UserService
from backend.services.project_search_service import ProjectSearchService
from backend.services.project_admin_service import ProjectAdminService
from backend.services.team_service import TeamService
from sqlalchemy import func, or_
from sqlalchemy.sql.expression import true
summary_cache = TTLCache(maxsize=1024, ttl=600)
class ProjectServiceError(Exception):
""" Custom Exception to notify callers an error occurred when handling projects """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class ProjectService:
@staticmethod
def get_project_by_id(project_id: int) -> Project:
project = Project.get(project_id)
if project is None:
raise NotFound()
return project
@staticmethod
def exists(project_id: int) -> bool:
project = Project.exists(project_id)
if project is None:
raise NotFound()
return True
@staticmethod
def get_project_by_name(project_id: int) -> Project:
project = Project.get(project_id)
if project is None:
raise NotFound()
return project
@staticmethod
def auto_unlock_tasks(project_id: int):
Task.auto_unlock_tasks(project_id)
@staticmethod
def delete_tasks(project_id: int, tasks_ids):
# Validate project exists.
project = Project.get(project_id)
if project is None:
raise NotFound({"project": project_id})
tasks = [{"id": i, "obj": Task.get(i, project_id)} for i in tasks_ids]
# In case a task is not found.
not_found = [t["id"] for t in tasks if t["obj"] is None]
if len(not_found) > 0:
raise NotFound({"tasks": not_found})
# Delete task one by one.
[t["obj"].delete() for t in tasks]
@staticmethod
def get_contribs_by_day(project_id: int) -> ProjectContribsDTO:
# Validate that project exists
project = ProjectService.get_project_by_id(project_id)
# Fetch all state change with date and task ID
stats = (
TaskHistory.query.with_entities(
TaskHistory.action_text.label("action_text"),
func.DATE(TaskHistory.action_date).label("day"),
TaskHistory.task_id.label("task_id"),
)
.filter(TaskHistory.project_id == project_id)
.filter(
TaskHistory.action == "STATE_CHANGE",
or_(
TaskHistory.action_text == "MAPPED",
TaskHistory.action_text == "VALIDATED",
TaskHistory.action_text == "INVALIDATED",
),
)
.group_by("action_text", "day", "task_id")
.order_by("day")
).all()
contribs_dto = ProjectContribsDTO()
# Filter and store unique dates
dates = list(set(r[1] for r in stats))
dates.sort(
reverse=False
) # Why was this reversed? To have the dates in ascending order
dates_list = []
cumulative_mapped = 0
cumulative_validated = 0
# A hashmap to track task state change updates
tasks = {
"MAPPED": {"total": 0},
"VALIDATED": {"total": 0},
"INVALIDATED": {"total": 0},
}
for date in dates:
dto = ProjectContribDTO(
{
"date": date,
"mapped": 0,
"validated": 0,
"total_tasks": project.total_tasks,
}
)
# s -> ('LOCKED_FOR_MAPPING', datetime.date(2019, 4, 23), 1)
# s[0] -> action, s[1] -> date, s[2] -> task_id
values = [(s[0], s[2]) for s in stats if date == s[1]]
values.sort(reverse=True) # Most recent action comes first
for val in values:
task_id = val[1]
task_status = val[0]
if task_status == "MAPPED":
if task_id not in tasks["MAPPED"]:
tasks["MAPPED"][task_id] = 1
tasks["MAPPED"]["total"] += 1
dto.mapped += 1
elif task_status == "VALIDATED":
if task_id not in tasks["VALIDATED"]:
tasks["VALIDATED"][task_id] = 1
tasks["VALIDATED"]["total"] += 1
dto.validated += 1
if task_id in tasks["INVALIDATED"]:
del tasks["INVALIDATED"][task_id]
tasks["INVALIDATED"]["total"] -= 1
if task_id not in tasks["MAPPED"]:
tasks["MAPPED"][task_id] = 1
tasks["MAPPED"]["total"] += 1
dto.mapped += 1
else:
if task_id not in tasks["INVALIDATED"]:
tasks["INVALIDATED"][task_id] = 1
tasks["INVALIDATED"]["total"] += 1
if task_id in tasks["MAPPED"]:
del tasks["MAPPED"][task_id]
tasks["MAPPED"]["total"] -= 1
if dto.mapped > 0:
dto.mapped -= 1
if task_id in tasks["VALIDATED"]:
del tasks["VALIDATED"][task_id]
tasks["VALIDATED"]["total"] -= 1
if dto.validated > 0:
dto.validated -= 1
cumulative_mapped = tasks["MAPPED"]["total"]
cumulative_validated = tasks["VALIDATED"]["total"]
dto.cumulative_mapped = cumulative_mapped
dto.cumulative_validated = cumulative_validated
dates_list.append(dto)
contribs_dto.stats = dates_list
return contribs_dto
@staticmethod
def get_project_dto_for_mapper(
project_id, current_user_id, locale="en", abbrev=False
) -> ProjectDTO:
"""
Get the project DTO for mappers
:param project_id: ID of the Project mapper has requested
:param locale: Locale the mapper has requested
:raises ProjectServiceError, NotFound
"""
project = ProjectService.get_project_by_id(project_id)
# if project is public and is not draft, we don't need to check permissions
if not project.private and not project.status == ProjectStatus.DRAFT.value:
return project.as_dto_for_mapping(current_user_id, locale, abbrev)
is_allowed_user = True
is_team_member = None
is_manager_permission = False
if current_user_id:
is_manager_permission = (
ProjectAdminService.is_user_action_permitted_on_project(
current_user_id, project_id
)
)
# Draft Projects - admins, authors, org admins & team managers permitted
if project.status == ProjectStatus.DRAFT.value:
if not is_manager_permission:
is_allowed_user = False
raise ProjectServiceError("Unable to fetch project")
# Private Projects - allowed_users, admins, org admins &
# assigned teams (mappers, validators, project managers), authors permitted
if project.private and not is_manager_permission:
is_allowed_user = False
if current_user_id:
is_allowed_user = (
len(
[
user
for user in project.allowed_users
if user.id == current_user_id
]
)
> 0
)
if not (is_allowed_user or is_manager_permission):
if current_user_id:
allowed_roles = [
TeamRoles.MAPPER.value,
TeamRoles.VALIDATOR.value,
TeamRoles.PROJECT_MANAGER.value,
]
is_team_member = TeamService.check_team_membership(
project_id, allowed_roles, current_user_id
)
if is_allowed_user or is_manager_permission or is_team_member:
return project.as_dto_for_mapping(current_user_id, locale, abbrev)
else:
raise ProjectServiceError("Unable to fetch project")
@staticmethod
def get_project_tasks(
project_id,
task_ids_str: str,
order_by: str = None,
order_by_type: str = "ASC",
status: int = None,
):
project = ProjectService.get_project_by_id(project_id)
return project.tasks_as_geojson(task_ids_str, order_by, order_by_type, status)
@staticmethod
def get_project_aoi(project_id):
project = ProjectService.get_project_by_id(project_id)
return project.get_aoi_geometry_as_geojson()
@staticmethod
def get_project_priority_areas(project_id):
project = ProjectService.get_project_by_id(project_id)
geojson_areas = []
for priority_area in project.priority_areas:
geojson_areas.append(priority_area.get_as_geojson())
return geojson_areas
@staticmethod
def get_task_for_logged_in_user(user_id: int):
""" if the user is working on a task in the project return it """
tasks = Task.get_locked_tasks_for_user(user_id)
tasks_dto = tasks
return tasks_dto
@staticmethod
def get_task_details_for_logged_in_user(user_id: int, preferred_locale: str):
""" if the user is working on a task in the project return it """
tasks = Task.get_locked_tasks_details_for_user(user_id)
if len(tasks) == 0:
raise NotFound()
# TODO put the task details in to a DTO
dtos = []
for task in tasks:
dtos.append(task.as_dto_with_instructions(preferred_locale))
task_dtos = TaskDTOs()
task_dtos.tasks = dtos
return task_dtos
@staticmethod
def is_user_in_the_allowed_list(allowed_users: list, current_user_id: int):
"""For private projects, check if user is present in the allowed list"""
return (
len([user.id for user in allowed_users if user.id == current_user_id]) > 0
)
@staticmethod
def evaluate_mapping_permission(
project_id: int, user_id: int, mapping_permission: int
):
allowed_roles = [
TeamRoles.MAPPER.value,
TeamRoles.VALIDATOR.value,
TeamRoles.PROJECT_MANAGER.value,
]
is_team_member = TeamService.check_team_membership(
project_id, allowed_roles, user_id
)
# mapping_permission = 1(level),2(teams),3(teamsAndLevel)
if mapping_permission == MappingPermission.TEAMS.value:
if not is_team_member:
return False, MappingNotAllowed.USER_NOT_TEAM_MEMBER
elif mapping_permission == MappingPermission.LEVEL.value:
if not ProjectService._is_user_intermediate_or_advanced(user_id):
return False, MappingNotAllowed.USER_NOT_CORRECT_MAPPING_LEVEL
elif mapping_permission == MappingPermission.TEAMS_LEVEL.value:
if not ProjectService._is_user_intermediate_or_advanced(user_id):
return False, MappingNotAllowed.USER_NOT_CORRECT_MAPPING_LEVEL
if not is_team_member:
return False, MappingNotAllowed.USER_NOT_TEAM_MEMBER
@staticmethod
def is_user_permitted_to_map(project_id: int, user_id: int):
""" Check if the user is allowed to map the on the project in scope """
if UserService.is_user_blocked(user_id):
return False, MappingNotAllowed.USER_NOT_ON_ALLOWED_LIST
project = ProjectService.get_project_by_id(project_id)
if project.license_id:
if not UserService.has_user_accepted_license(user_id, project.license_id):
return False, MappingNotAllowed.USER_NOT_ACCEPTED_LICENSE
mapping_permission = project.mapping_permission
is_manager_permission = (
False # is_admin or is_author or is_org_manager or is_manager_team
)
if ProjectAdminService.is_user_action_permitted_on_project(user_id, project_id):
is_manager_permission = True
# Draft (public/private) accessible only for is_manager_permission
if (
ProjectStatus(project.status) == ProjectStatus.DRAFT
and not is_manager_permission
):
return False, MappingNotAllowed.PROJECT_NOT_PUBLISHED
is_restriction = None
if not is_manager_permission and mapping_permission:
is_restriction = ProjectService.evaluate_mapping_permission(
project_id, user_id, mapping_permission
)
tasks = Task.get_locked_tasks_for_user(user_id)
if len(tasks.locked_tasks) > 0:
return False, MappingNotAllowed.USER_ALREADY_HAS_TASK_LOCKED
is_allowed_user = None
if project.private and not is_manager_permission:
# Check if user is in allowed user list
is_allowed_user = ProjectService.is_user_in_the_allowed_list(
project.allowed_users, user_id
)
if is_allowed_user:
return True, "User allowed to map"
if not is_manager_permission and is_restriction:
return is_restriction
elif project.private and not (
is_manager_permission or is_allowed_user or not is_restriction
):
return False, MappingNotAllowed.USER_NOT_ON_ALLOWED_LIST
return True, "User allowed to map"
@staticmethod
def _is_user_intermediate_or_advanced(user_id):
""" Helper method to determine if user level is not beginner """
user_mapping_level = UserService.get_mapping_level(user_id)
if user_mapping_level not in [MappingLevel.INTERMEDIATE, MappingLevel.ADVANCED]:
return False
return True
@staticmethod
def evaluate_validation_permission(
project_id: int, user_id: int, validation_permission: int
):
allowed_roles = [TeamRoles.VALIDATOR.value, TeamRoles.PROJECT_MANAGER.value]
is_team_member = TeamService.check_team_membership(
project_id, allowed_roles, user_id
)
# validation_permission = 1(level),2(teams),3(teamsAndLevel)
if validation_permission == ValidationPermission.TEAMS.value:
if not is_team_member:
return False, ValidatingNotAllowed.USER_NOT_TEAM_MEMBER
elif validation_permission == ValidationPermission.LEVEL.value:
if not ProjectService._is_user_intermediate_or_advanced(user_id):
return False, ValidatingNotAllowed.USER_IS_BEGINNER
elif validation_permission == ValidationPermission.TEAMS_LEVEL.value:
if not ProjectService._is_user_intermediate_or_advanced(user_id):
return False, ValidatingNotAllowed.USER_IS_BEGINNER
if not is_team_member:
return False, ValidatingNotAllowed.USER_NOT_TEAM_MEMBER
@staticmethod
def is_user_permitted_to_validate(project_id, user_id):
""" Check if the user is allowed to validate on the project in scope """
if UserService.is_user_blocked(user_id):
return False, ValidatingNotAllowed.USER_NOT_ON_ALLOWED_LIST
project = ProjectService.get_project_by_id(project_id)
if project.license_id:
if not UserService.has_user_accepted_license(user_id, project.license_id):
return False, ValidatingNotAllowed.USER_NOT_ACCEPTED_LICENSE
validation_permission = project.validation_permission
# is_admin or is_author or is_org_manager or is_manager_team
is_manager_permission = False
if ProjectAdminService.is_user_action_permitted_on_project(user_id, project_id):
is_manager_permission = True
# Draft (public/private) accessible only for is_manager_permission
if (
ProjectStatus(project.status) == ProjectStatus.DRAFT
and not is_manager_permission
):
return False, ValidatingNotAllowed.PROJECT_NOT_PUBLISHED
is_restriction = None
if not is_manager_permission and validation_permission:
is_restriction = ProjectService.evaluate_validation_permission(
project_id, user_id, validation_permission
)
tasks = Task.get_locked_tasks_for_user(user_id)
if len(tasks.locked_tasks) > 0:
return False, ValidatingNotAllowed.USER_ALREADY_HAS_TASK_LOCKED
is_allowed_user = None
if project.private and not is_manager_permission:
# Check if user is in allowed user list
is_allowed_user = ProjectService.is_user_in_the_allowed_list(
project.allowed_users, user_id
)
if is_allowed_user:
return True, "User allowed to validate"
if not is_manager_permission and is_restriction:
return is_restriction
elif project.private and not (
is_manager_permission or is_allowed_user or not is_restriction
):
return False, ValidatingNotAllowed.USER_NOT_ON_ALLOWED_LIST
return True, "User allowed to validate"
@staticmethod
@cached(summary_cache)
def get_project_summary(
project_id: int, preferred_locale: str = "en"
) -> ProjectSummary:
""" Gets the project summary DTO """
project = ProjectService.get_project_by_id(project_id)
return project.get_project_summary(preferred_locale)
@staticmethod
def set_project_as_featured(project_id: int):
""" Sets project as featured """
project = ProjectService.get_project_by_id(project_id)
project.set_as_featured()
@staticmethod
def unset_project_as_featured(project_id: int):
""" Sets project as featured """
project = ProjectService.get_project_by_id(project_id)
project.unset_as_featured()
@staticmethod
def get_featured_projects(preferred_locale):
""" Sets project as featured """
query = ProjectSearchService.create_search_query()
projects = query.filter(Project.featured == true()).group_by(Project.id).all()
# Get total contributors.
contrib_counts = ProjectSearchService.get_total_contributions(projects)
zip_items = zip(projects, contrib_counts)
dto = ProjectSearchResultsDTO()
dto.results = [
ProjectSearchService.create_result_dto(p, preferred_locale, t)
for p, t in zip_items
]
return dto
@staticmethod
def is_favorited(project_id: int, user_id: int) -> bool:
project = ProjectService.get_project_by_id(project_id)
return project.is_favorited(user_id)
@staticmethod
def favorite(project_id: int, user_id: int):
project = ProjectService.get_project_by_id(project_id)
project.favorite(user_id)
@staticmethod
def unfavorite(project_id: int, user_id: int):
project = ProjectService.get_project_by_id(project_id)
project.unfavorite(user_id)
@staticmethod
def get_project_title(project_id: int, preferred_locale: str = "en") -> str:
""" Gets the project title DTO """
project = ProjectService.get_project_by_id(project_id)
return project.get_project_title(preferred_locale)
@staticmethod
@cached(TTLCache(maxsize=1024, ttl=600))
def get_project_stats(project_id: int) -> ProjectStatsDTO:
""" Gets the project stats DTO """
project = ProjectService.get_project_by_id(project_id)
return project.get_project_stats()
@staticmethod
def get_project_user_stats(project_id: int, username: str) -> ProjectUserStatsDTO:
""" Gets the user stats for a specific project """
project = ProjectService.get_project_by_id(project_id)
user = UserService.get_user_by_username(username)
return project.get_project_user_stats(user.id)
def get_project_teams(project_id: int):
project = ProjectService.get_project_by_id(project_id)
if project is None:
raise NotFound()
return project.teams
@staticmethod
def get_project_organisation(project_id: int) -> Organisation:
project = ProjectService.get_project_by_id(project_id)
if project is None:
raise NotFound()
return project.organisation