backend/services/mapping_service.py (279 lines of code) (raw):
import datetime
import xml.etree.ElementTree as ET
from flask import current_app
from geoalchemy2 import shape
from backend.models.dtos.mapping_dto import (
TaskDTO,
MappedTaskDTO,
LockTaskDTO,
StopMappingTaskDTO,
TaskCommentDTO,
)
from backend.models.postgis.statuses import MappingNotAllowed
from backend.models.postgis.task import Task, TaskStatus, TaskHistory, TaskAction
from backend.models.postgis.utils import NotFound, UserLicenseError
from backend.services.messaging.message_service import MessageService
from backend.services.project_service import ProjectService
from backend.services.stats_service import StatsService
class MappingServiceError(Exception):
""" Custom Exception to notify callers an error occurred when handling mapping """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class MappingService:
@staticmethod
def get_task(task_id: int, project_id: int) -> Task:
"""
Get task from DB
:raises: NotFound
"""
task = Task.get(task_id, project_id)
if task is None:
raise NotFound()
return task
@staticmethod
def get_task_as_dto(
task_id: int,
project_id: int,
preferred_local: str = "en",
) -> TaskDTO:
""" Get task as DTO for transmission over API """
task = MappingService.get_task(task_id, project_id)
task_dto = task.as_dto_with_instructions(preferred_local)
return task_dto
@staticmethod
def _is_task_undoable(logged_in_user_id: int, task: Task) -> bool:
""" Determines if the current task status can be undone by the logged in user """
# Test to see if user can undo status on this task
if logged_in_user_id and TaskStatus(task.task_status) not in [
TaskStatus.LOCKED_FOR_MAPPING,
TaskStatus.LOCKED_FOR_VALIDATION,
TaskStatus.READY,
]:
last_action = TaskHistory.get_last_action(task.project_id, task.id)
# User requesting task made the last change, so they are allowed to undo it.
if last_action.user_id == int(
logged_in_user_id
) or ProjectService.is_user_permitted_to_validate(
task.project_id, logged_in_user_id
):
return True
return False
@staticmethod
def lock_task_for_mapping(lock_task_dto: LockTaskDTO) -> TaskDTO:
"""
Sets the task_locked status to locked so no other user can work on it
:param lock_task_dto: DTO with data needed to lock the task
:raises TaskServiceError
:return: Updated task, or None if not found
"""
task = MappingService.get_task(lock_task_dto.task_id, lock_task_dto.project_id)
if not task.is_mappable():
raise MappingServiceError("Task in invalid state for mapping")
user_can_map, error_reason = ProjectService.is_user_permitted_to_map(
lock_task_dto.project_id, lock_task_dto.user_id
)
if not user_can_map:
if error_reason == MappingNotAllowed.USER_NOT_ACCEPTED_LICENSE:
raise UserLicenseError("User must accept license to map this task")
else:
raise MappingServiceError(
f"Mapping not allowed because: {error_reason}"
)
task.lock_task_for_mapping(lock_task_dto.user_id)
return task.as_dto_with_instructions(lock_task_dto.preferred_locale)
@staticmethod
def unlock_task_after_mapping(mapped_task: MappedTaskDTO) -> TaskDTO:
""" Unlocks the task and sets the task history appropriately """
task = MappingService.get_task_locked_by_user(
mapped_task.project_id, mapped_task.task_id, mapped_task.user_id
)
new_state = TaskStatus[mapped_task.status.upper()]
if new_state not in [
TaskStatus.MAPPED,
TaskStatus.BADIMAGERY,
TaskStatus.READY,
]:
raise MappingServiceError(
"Can only set status to MAPPED, BADIMAGERY, READY after mapping"
)
# Update stats around the change of state
last_state = TaskHistory.get_last_status(
mapped_task.project_id, mapped_task.task_id, True
)
StatsService.update_stats_after_task_state_change(
mapped_task.project_id, mapped_task.user_id, last_state, new_state
)
if mapped_task.comment:
# Parses comment to see if any users have been @'d
MessageService.send_message_after_comment(
mapped_task.user_id,
mapped_task.comment,
task.id,
mapped_task.project_id,
)
task.unlock_task(mapped_task.user_id, new_state, mapped_task.comment)
return task.as_dto_with_instructions(mapped_task.preferred_locale)
@staticmethod
def stop_mapping_task(stop_task: StopMappingTaskDTO) -> TaskDTO:
""" Unlocks the task and revert the task status to the last one """
task = MappingService.get_task_locked_by_user(
stop_task.project_id, stop_task.task_id, stop_task.user_id
)
if stop_task.comment:
# Parses comment to see if any users have been @'d
MessageService.send_message_after_comment(
stop_task.user_id, stop_task.comment, task.id, stop_task.project_id
)
task.reset_lock(stop_task.user_id, stop_task.comment)
return task.as_dto_with_instructions(stop_task.preferred_locale)
@staticmethod
def get_task_locked_by_user(project_id: int, task_id: int, user_id: int) -> Task:
"""
Returns task specified by project id and task id if found and locked for mapping by user
:raises: MappingServiceError
"""
task = MappingService.get_task(task_id, project_id)
if task is None:
raise MappingServiceError(f"Task {task_id} not found")
current_state = TaskStatus(task.task_status)
if current_state != TaskStatus.LOCKED_FOR_MAPPING:
raise MappingServiceError("Status must be LOCKED_FOR_MAPPING to unlock")
if task.locked_by != user_id:
raise MappingServiceError(
"Attempting to unlock a task owned by another user"
)
return task
@staticmethod
def add_task_comment(task_comment: TaskCommentDTO) -> TaskDTO:
""" Adds the comment to the task history """
task = Task.get(task_comment.task_id, task_comment.project_id)
if task is None:
raise MappingServiceError(f"Task {task_comment.task_id} not found")
task.set_task_history(
TaskAction.COMMENT, task_comment.user_id, task_comment.comment
)
# Parse comment to see if any users have been @'d
MessageService.send_message_after_comment(
task_comment.user_id, task_comment.comment, task.id, task_comment.project_id
)
task.update()
return task.as_dto_with_instructions(task_comment.preferred_locale)
@staticmethod
def generate_gpx(project_id: int, task_ids_str: str, timestamp=None):
"""
Creates a GPX file for supplied tasks. Timestamp is for unit testing only.
You can use the following URL to test locally:
http://www.openstreetmap.org/edit?editor=id&#map=11/31.50362930069913/34.628906243797054&comment=CHANGSET_COMMENT&gpx=http://localhost:5000/api/v2/projects/{project_id}/tasks/queries/gpx%3Ftasks=2
"""
if timestamp is None:
timestamp = datetime.datetime.utcnow()
root = ET.Element(
"gpx",
attrib=dict(
xmlns="http://www.topografix.com/GPX/1/1",
version="1.1",
creator="HOT Tasking Manager",
),
)
# Create GPX Metadata element
metadata = ET.Element("metadata")
link = ET.SubElement(
metadata,
"link",
attrib=dict(href="https://github.com/hotosm/tasking-manager"),
)
ET.SubElement(link, "text").text = "HOT Tasking Manager"
ET.SubElement(metadata, "time").text = timestamp.isoformat()
root.append(metadata)
# Create trk element
trk = ET.Element("trk")
root.append(trk)
ET.SubElement(
trk, "name"
).text = f"Task for project {project_id}. Do not edit outside of this area!"
# Construct trkseg elements
if task_ids_str is not None:
task_ids = map(int, task_ids_str.split(","))
tasks = Task.get_tasks(project_id, task_ids)
if not tasks or len(tasks) == 0:
raise NotFound()
else:
tasks = Task.get_all_tasks(project_id)
if not tasks or len(tasks) == 0:
raise NotFound()
for task in tasks:
task_geom = shape.to_shape(task.geometry)
for poly in task_geom:
trkseg = ET.SubElement(trk, "trkseg")
for point in poly.exterior.coords:
ET.SubElement(
trkseg,
"trkpt",
attrib=dict(lon=str(point[0]), lat=str(point[1])),
)
# Append wpt elements to end of doc
wpt = ET.Element(
"wpt", attrib=dict(lon=str(point[0]), lat=str(point[1]))
)
root.append(wpt)
xml_gpx = ET.tostring(root, encoding="utf8")
return xml_gpx
@staticmethod
def generate_osm_xml(project_id: int, task_ids_str: str) -> str:
"""Generate xml response suitable for loading into JOSM. A sample output file is in
/backend/helpers/testfiles/osm-sample.xml"""
# Note XML created with upload No to ensure it will be rejected by OSM if uploaded by mistake
root = ET.Element(
"osm",
attrib=dict(version="0.6", upload="never", creator="HOT Tasking Manager"),
)
if task_ids_str:
task_ids = map(int, task_ids_str.split(","))
tasks = Task.get_tasks(project_id, task_ids)
if not tasks or len(tasks) == 0:
raise NotFound()
else:
tasks = Task.get_all_tasks(project_id)
if not tasks or len(tasks) == 0:
raise NotFound()
fake_id = -1 # We use fake-ids to ensure XML will not be validated by OSM
for task in tasks:
task_geom = shape.to_shape(task.geometry)
way = ET.SubElement(
root,
"way",
attrib=dict(id=str((task.id * -1)), action="modify", visible="true"),
)
for poly in task_geom:
for point in poly.exterior.coords:
ET.SubElement(
root,
"node",
attrib=dict(
action="modify",
visible="true",
id=str(fake_id),
lon=str(point[0]),
lat=str(point[1]),
),
)
ET.SubElement(way, "nd", attrib=dict(ref=str(fake_id)))
fake_id -= 1
xml_gpx = ET.tostring(root, encoding="utf8")
return xml_gpx
@staticmethod
def undo_mapping(
project_id: int, task_id: int, user_id: int, preferred_locale: str = "en"
) -> TaskDTO:
""" Allows a user to Undo the task state they updated """
task = MappingService.get_task(task_id, project_id)
if not MappingService._is_task_undoable(user_id, task):
raise MappingServiceError("Undo not allowed for this user")
current_state = TaskStatus(task.task_status)
undo_state = TaskHistory.get_last_status(project_id, task_id, True)
# Refer to last action for user of it.
last_action = TaskHistory.get_last_action(project_id, task_id)
StatsService.update_stats_after_task_state_change(
project_id, last_action.user_id, current_state, undo_state, "undo"
)
task.unlock_task(
user_id,
undo_state,
f"Undo state from {current_state.name} to {undo_state.name}",
True,
)
return task.as_dto_with_instructions(preferred_locale)
@staticmethod
def map_all_tasks(project_id: int, user_id: int):
""" Marks all tasks on a project as mapped """
tasks_to_map = Task.query.filter(
Task.project_id == project_id,
Task.task_status.notin_(
[
TaskStatus.BADIMAGERY.value,
TaskStatus.MAPPED.value,
TaskStatus.VALIDATED.value,
]
),
).all()
for task in tasks_to_map:
if TaskStatus(task.task_status) not in [
TaskStatus.LOCKED_FOR_MAPPING,
TaskStatus.LOCKED_FOR_VALIDATION,
]:
# Only lock tasks that are not already locked to avoid double lock issue
task.lock_task_for_mapping(user_id)
task.unlock_task(user_id, new_state=TaskStatus.MAPPED)
# Set counters to fully mapped
project = ProjectService.get_project_by_id(project_id)
project.tasks_mapped = project.total_tasks - project.tasks_bad_imagery
project.save()
@staticmethod
def reset_all_badimagery(project_id: int, user_id: int):
""" Marks all bad imagery tasks ready for mapping """
badimagery_tasks = Task.query.filter(
Task.task_status == TaskStatus.BADIMAGERY.value
).all()
for task in badimagery_tasks:
task.lock_task_for_mapping(user_id)
task.unlock_task(user_id, new_state=TaskStatus.READY)
# Reset bad imagery counter
project = ProjectService.get_project_by_id(project_id)
project.tasks_bad_imagery = 0
project.save()