backend/services/project_admin_service.py (230 lines of code) (raw):
import json
import geojson
from flask import current_app
from backend.models.dtos.project_dto import (
DraftProjectDTO,
ProjectDTO,
ProjectCommentsDTO,
ProjectSearchDTO,
)
from backend.models.postgis.project import Project, Task, ProjectStatus
from backend.models.postgis.statuses import TaskCreationMode, TeamRoles
from backend.models.postgis.task import TaskHistory, TaskStatus, TaskAction
from backend.models.postgis.utils import NotFound, InvalidData, InvalidGeoJson
from backend.services.grid.grid_service import GridService
from backend.services.license_service import LicenseService
from backend.services.users.user_service import UserService
from backend.services.organisation_service import OrganisationService
from backend.services.team_service import TeamService
class ProjectAdminServiceError(Exception):
""" Custom Exception to notify callers an error occurred when validating a Project """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class ProjectStoreError(Exception):
""" Custom Exception to notify callers an error occurred with database CRUD operations """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class ProjectAdminService:
@staticmethod
def create_draft_project(draft_project_dto: DraftProjectDTO) -> int:
"""
Validates and then persists draft projects in the DB
:param draft_project_dto: Draft Project DTO with data from API
:raises InvalidGeoJson
:returns ID of new draft project
"""
user_id = draft_project_dto.user_id
is_admin = UserService.is_user_an_admin(user_id)
user_orgs = OrganisationService.get_organisations_managed_by_user_as_dto(
user_id
)
is_org_manager = len(user_orgs.organisations) > 0
# First things first, we need to validate that the author_id is a PM. issue #1715
if not (is_admin or is_org_manager):
user = UserService.get_user_by_id(user_id)
raise (
ProjectAdminServiceError(
f"User {user.username} is not permitted to create project"
)
)
# If we're cloning we'll copy all the project details from the clone, otherwise create brand new project
if draft_project_dto.cloneFromProjectId:
draft_project = Project.clone(draft_project_dto.cloneFromProjectId, user_id)
else:
draft_project = Project()
org = OrganisationService.get_organisation_by_id(
draft_project_dto.organisation
)
if org is None:
raise NotFound("Organisation does not exist")
draft_project_dto.organisation = org
draft_project.create_draft_project(draft_project_dto)
draft_project.set_project_aoi(draft_project_dto)
# if arbitrary_tasks requested, create tasks from aoi otherwise use tasks in DTO
if draft_project_dto.has_arbitrary_tasks:
tasks = GridService.tasks_from_aoi_features(
draft_project_dto.area_of_interest
)
draft_project.task_creation_mode = TaskCreationMode.ARBITRARY.value
else:
tasks = draft_project_dto.tasks
ProjectAdminService._attach_tasks_to_project(draft_project, tasks)
if draft_project_dto.cloneFromProjectId:
draft_project.save() # Update the clone
else:
draft_project.create() # Create the new project
draft_project.set_default_changeset_comment()
draft_project.set_country_info()
return draft_project.id
@staticmethod
def _set_default_changeset_comment(draft_project: Project):
""" Sets the default changesset comment when project created """
default_comment = current_app.config["DEFAULT_CHANGESET_COMMENT"]
draft_project.changeset_comment = f"{default_comment}-{draft_project.id}"
draft_project.save()
@staticmethod
def _get_project_by_id(project_id: int) -> Project:
project = Project.get(project_id)
if project is None:
raise NotFound()
return project
@staticmethod
def get_project_dto_for_admin(project_id: int) -> ProjectDTO:
""" Get the project as DTO for project managers """
project = ProjectAdminService._get_project_by_id(project_id)
return project.as_dto_for_admin(project_id)
@staticmethod
def update_project(project_dto: ProjectDTO, authenticated_user_id: int):
project_id = project_dto.project_id
if project_dto.project_status == ProjectStatus.PUBLISHED.name:
ProjectAdminService._validate_default_locale(
project_dto.default_locale, project_dto.project_info_locales
)
if project_dto.license_id:
ProjectAdminService._validate_imagery_licence(project_dto.license_id)
if ProjectAdminService.is_user_action_permitted_on_project(
authenticated_user_id, project_id
):
project = ProjectAdminService._get_project_by_id(project_id)
project.update(project_dto)
else:
raise ValueError(
str(project_id)
+ " :Project can only be updated by admins or by the owner"
)
return project
@staticmethod
def _validate_imagery_licence(license_id: int):
""" Ensures that the suppliced license Id actually exists """
try:
LicenseService.get_license_as_dto(license_id)
except NotFound:
raise ProjectAdminServiceError(f"LicenseId {license_id} not found")
@staticmethod
def delete_project(project_id: int, authenticated_user_id: int):
""" Deletes project if it has no completed tasks """
project = ProjectAdminService._get_project_by_id(project_id)
is_admin = UserService.is_user_an_admin(authenticated_user_id)
user_orgs = OrganisationService.get_organisations_managed_by_user_as_dto(
authenticated_user_id
)
is_org_manager = len(user_orgs.organisations) > 0
if is_admin or is_org_manager:
if project.can_be_deleted():
project.delete()
else:
raise ProjectAdminServiceError(
"Project has mapped tasks, cannot be deleted"
)
else:
raise ProjectAdminServiceError(
"User does not have permissions to delete project"
)
@staticmethod
def reset_all_tasks(project_id: int, user_id: int):
""" Resets all tasks on project, preserving history"""
tasks_to_reset = Task.query.filter(Task.project_id == project_id).all()
for task in tasks_to_reset:
task.set_task_history(
TaskAction.COMMENT, user_id, "Task reset", TaskStatus.READY
)
task.reset_task(user_id)
# Reset project counters
project = ProjectAdminService._get_project_by_id(project_id)
project.tasks_mapped = 0
project.tasks_validated = 0
project.tasks_bad_imagery = 0
project.save()
@staticmethod
def get_all_comments(project_id: int) -> ProjectCommentsDTO:
""" Gets all comments mappers, validators have added to tasks associated with project """
comments = TaskHistory.get_all_comments(project_id)
if len(comments.comments) == 0:
raise NotFound("No comments found on project")
return comments
@staticmethod
def _attach_tasks_to_project(draft_project: Project, tasks_geojson):
"""
Validates then iterates over the array of tasks and attach them to the draft project
:param draft_project: Draft project in scope
:param tasks_geojson: GeoJSON feature collection of mapping tasks
:raises InvalidGeoJson, InvalidData
"""
tasks = geojson.loads(json.dumps(tasks_geojson))
if type(tasks) is not geojson.FeatureCollection:
raise InvalidGeoJson("Tasks: Invalid GeoJson must be FeatureCollection")
is_valid_geojson = geojson.is_valid(tasks)
if is_valid_geojson["valid"] == "no":
raise InvalidGeoJson(
f"Tasks: Invalid FeatureCollection - {is_valid_geojson['message']}"
)
task_count = 1
for feature in tasks["features"]:
try:
task = Task.from_geojson_feature(task_count, feature)
except (InvalidData, InvalidGeoJson) as e:
raise e
draft_project.tasks.append(task)
task_count += 1
task_count -= 1 # Remove last increment before falling out loop
draft_project.total_tasks = task_count
@staticmethod
def _validate_default_locale(default_locale, project_info_locales):
"""
Validates that all fields for the default project info locale have been completed
:param default_locale: Admin supplied default locale
:param project_info_locales: All locales supplied by admin
:raises ProjectAdminServiceError
:return: True if valid
"""
default_info = None
for info in project_info_locales:
if info.locale.lower() == default_locale.lower():
default_info = info
break
if default_info is None:
raise ProjectAdminServiceError(
"Project Info for Default Locale not provided"
)
for attr, value in default_info.items():
if attr == "per_task_instructions":
continue # Not mandatory field
if not value:
raise (
ProjectAdminServiceError(f"{attr} not provided for Default Locale")
)
return True # Indicates valid default locale for unit testing
@staticmethod
def get_projects_for_admin(
admin_id: int, preferred_locale: str, search_dto: ProjectSearchDTO
):
""" Get all projects for provided admin """
return Project.get_projects_for_admin(admin_id, preferred_locale, search_dto)
@staticmethod
def transfer_project_to(project_id: int, transfering_user_id: int, username: str):
""" Transfers project from old owner (transfering_user_id) to new owner (username) """
project = Project.get(project_id)
# Check permissions for the user (transferring_user_id) who initiatied the action
if not ProjectAdminService.is_user_action_permitted_on_project(
transfering_user_id, project_id
):
raise ValueError("User action not permitted")
new_owner = UserService.get_user_by_username(username)
# Check permissions for the new owner - must be an admin or project's org manager or a PM team member
if not ProjectAdminService.is_user_action_permitted_on_project(
new_owner.id, project_id
):
raise ValueError("User action not permitted")
else:
project.save()
@staticmethod
def is_user_action_permitted_on_project(
authenticated_user_id: int, project_id: int
) -> bool:
""" Is user action permitted on project"""
project = Project.get(project_id)
author_id = project.author_id
allowed_roles = [TeamRoles.PROJECT_MANAGER.value]
is_admin = UserService.is_user_an_admin(authenticated_user_id)
is_author = UserService.is_user_the_project_author(
authenticated_user_id, author_id
)
is_org_manager = False
is_manager_team = False
if not (is_admin or is_author):
if hasattr(project, "organisation_id") and project.organisation_id:
org_id = project.organisation_id
is_org_manager = OrganisationService.is_user_an_org_manager(
org_id, authenticated_user_id
)
if not is_org_manager:
is_manager_team = TeamService.check_team_membership(
project_id, allowed_roles, authenticated_user_id
)
return is_admin or is_author or is_org_manager or is_manager_team