backend/services/users/user_service.py (632 lines of code) (raw):
from cachetools import TTLCache, cached
from flask import current_app
import datetime
from sqlalchemy.sql.expression import literal
from sqlalchemy import func, or_, desc, and_, distinct, cast, Time
from backend import db
from backend.models.dtos.project_dto import ProjectFavoritesDTO, ProjectSearchResultsDTO
from backend.models.dtos.user_dto import (
UserDTO,
UserOSMDTO,
UserFilterDTO,
UserSearchQuery,
UserSearchDTO,
UserStatsDTO,
UserContributionDTO,
UserRegisterEmailDTO,
UserCountryContributed,
UserCountriesContributed,
)
from backend.models.dtos.interests_dto import InterestsListDTO, InterestDTO
from backend.models.postgis.interests import Interest, project_interests
from backend.models.postgis.message import Message
from backend.models.postgis.project import Project
from backend.models.postgis.user import User, UserRole, MappingLevel, UserEmail
from backend.models.postgis.task import TaskHistory, TaskAction, Task
from backend.models.dtos.user_dto import UserTaskDTOs
from backend.models.dtos.stats_dto import Pagination
from backend.models.postgis.statuses import TaskStatus, ProjectStatus
from backend.models.postgis.utils import NotFound
from backend.services.users.osm_service import OSMService, OSMServiceError
from backend.services.messaging.smtp_service import SMTPService
from backend.services.messaging.template_service import (
get_txt_template,
template_var_replacing,
)
user_filter_cache = TTLCache(maxsize=1024, ttl=600)
class UserServiceError(Exception):
""" Custom Exception to notify callers an error occurred when in the User Service """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class UserService:
@staticmethod
def get_user_by_id(user_id: int) -> User:
user = User.get_by_id(user_id)
if user is None:
raise NotFound()
return user
@staticmethod
def get_user_by_username(username: str) -> User:
user = User.get_by_username(username)
if user is None:
raise NotFound()
return user
@staticmethod
def get_contributions_by_day(user_id: int):
# Validate that user exists.
stats = (
TaskHistory.query.with_entities(
func.DATE(TaskHistory.action_date).label("day"),
func.count(TaskHistory.action).label("cnt"),
)
.filter(TaskHistory.user_id == user_id)
.filter(TaskHistory.action == TaskAction.STATE_CHANGE.name)
.filter(
func.DATE(TaskHistory.action_date)
> datetime.date.today() - datetime.timedelta(days=365)
)
.group_by("day")
.order_by(desc("day"))
)
contributions = [
UserContributionDTO(dict(date=str(s[0]), count=s[1])) for s in stats
]
return contributions
@staticmethod
def get_project_managers() -> User:
users = User.query.filter(User.role == 2).all()
if users is None:
raise NotFound()
return users
@staticmethod
def get_general_admins() -> User:
users = User.query.filter(User.role == 1).all()
if users is None:
raise NotFound()
return users
@staticmethod
def update_user(user_id: int, osm_username: str, picture_url: str) -> User:
user = UserService.get_user_by_id(user_id)
if user.username != osm_username:
user.update_username(osm_username)
if picture_url is not None and user.picture_url != picture_url:
user.update_picture_url(picture_url)
return user
@staticmethod
def get_projects_favorited(user_id: int) -> ProjectFavoritesDTO:
user = UserService.get_user_by_id(user_id)
projects_dto = [f.as_dto_for_admin(f.id) for f in user.favorites]
fav_dto = ProjectFavoritesDTO()
fav_dto.favorited_projects = projects_dto
return fav_dto
@staticmethod
def get_projects_mapped(user_id: int):
user = UserService.get_user_by_id(user_id)
projects_mapped = user.projects_mapped
# Return empty list if the user has no projects_mapped.
if projects_mapped is None:
return []
return projects_mapped
@staticmethod
def register_user(osm_id, username, changeset_count, picture_url, email):
"""
Creates user in DB
:param osm_id: Unique OSM user id
:param username: OSM Username
:param changeset_count: OSM changeset count
"""
new_user = User()
new_user.id = osm_id
new_user.username = username
if picture_url is not None:
new_user.picture_url = picture_url
intermediate_level = current_app.config["MAPPER_LEVEL_INTERMEDIATE"]
advanced_level = current_app.config["MAPPER_LEVEL_ADVANCED"]
if changeset_count > advanced_level:
new_user.mapping_level = MappingLevel.ADVANCED.value
elif intermediate_level < changeset_count < advanced_level:
new_user.mapping_level = MappingLevel.INTERMEDIATE.value
else:
new_user.mapping_level = MappingLevel.BEGINNER.value
if email is not None:
new_user.email_address = email
new_user.create()
return new_user
@staticmethod
def get_user_dto_by_username(
requested_username: str, logged_in_user_id: int
) -> UserDTO:
"""Gets user DTO for supplied username """
requested_user = UserService.get_user_by_username(requested_username)
logged_in_user = UserService.get_user_by_id(logged_in_user_id)
UserService.check_and_update_mapper_level(requested_user.id)
return requested_user.as_dto(logged_in_user.username)
@staticmethod
def get_user_dto_by_id(user: int, request_user: int) -> UserDTO:
"""Gets user DTO for supplied user id """
user = UserService.get_user_by_id(user)
if request_user:
request_username = UserService.get_user_by_id(request_user).username
return user.as_dto(request_username)
return user.as_dto()
@staticmethod
def get_interests_stats(user_id):
# Get all projects that the user has contributed.
stmt = (
TaskHistory.query.with_entities(TaskHistory.project_id)
.distinct()
.filter(TaskHistory.user_id == user_id)
)
interests = (
Interest.query.with_entities(
Interest.id,
Interest.name,
func.count(distinct(project_interests.c.project_id)).label(
"count_projects"
),
)
.join(
project_interests,
and_(
Interest.id == project_interests.c.interest_id,
project_interests.c.project_id.in_(stmt),
),
)
.group_by(Interest.id)
.order_by(desc("count_projects"))
.all()
)
interests_dto = [
InterestDTO(dict(id=i.id, name=i.name, count_projects=i.count_projects))
for i in interests
]
return interests_dto
@staticmethod
def get_tasks_dto(
user_id: int,
start_date: datetime.datetime = None,
end_date: datetime.datetime = None,
task_status: str = None,
project_status: str = None,
project_id: int = None,
page=1,
page_size=10,
sort_by: str = None,
) -> UserTaskDTOs:
base_query = (
TaskHistory.query.with_entities(
TaskHistory.project_id.label("project_id"),
TaskHistory.task_id.label("task_id"),
func.max(TaskHistory.action_date).label("max"),
)
.filter(TaskHistory.user_id == user_id)
.group_by(TaskHistory.task_id, TaskHistory.project_id)
)
if task_status:
base_query = base_query.filter(
TaskHistory.action_text == TaskStatus[task_status.upper()].name
)
if start_date:
base_query = base_query.filter(TaskHistory.action_date >= start_date)
if end_date:
base_query = base_query.filter(TaskHistory.action_date <= end_date)
if sort_by == "action_date":
base_query = base_query.order_by(func.max(TaskHistory.action_date))
elif sort_by == "-action_date":
base_query = base_query.order_by(desc(func.max(TaskHistory.action_date)))
user_task_dtos = UserTaskDTOs()
task_id_list = base_query.subquery()
comments_query = (
TaskHistory.query.with_entities(
TaskHistory.project_id,
TaskHistory.task_id,
func.count(TaskHistory.action).label("count"),
)
.filter(TaskHistory.action == "COMMENT")
.group_by(TaskHistory.task_id, TaskHistory.project_id)
).subquery()
sq = (
db.session.query(
func.coalesce(comments_query.c.count, 0).label("comments"), task_id_list
)
.select_from(task_id_list)
.outerjoin(
comments_query,
(comments_query.c.task_id == task_id_list.c.task_id)
& (comments_query.c.project_id == task_id_list.c.project_id),
)
.subquery()
)
tasks = Task.query.join(
sq,
and_(
Task.id == sq.c.task_id,
Task.project_id == sq.c.project_id,
),
)
tasks = tasks.add_columns("max", "comments")
if project_status:
tasks = tasks.filter(
Task.project_id == Project.id,
Project.status == ProjectStatus[project_status.upper()].value,
)
if project_id:
tasks = tasks.filter_by(project_id=project_id)
results = tasks.paginate(page, page_size, True)
task_list = []
for task, action_date, comments in results.items:
task_list.append(task.as_dto(last_updated=action_date, comments=comments))
user_task_dtos.user_tasks = task_list
user_task_dtos.pagination = Pagination(results)
return user_task_dtos
@staticmethod
def get_detailed_stats(username: str):
user = UserService.get_user_by_username(username)
stats_dto = UserStatsDTO()
actions = [
TaskStatus.VALIDATED.name,
TaskStatus.INVALIDATED.name,
TaskStatus.MAPPED.name,
]
actions_table = (
db.session.query(literal(TaskStatus.VALIDATED.name).label("action_text"))
.union(
db.session.query(
literal(TaskStatus.INVALIDATED.name).label("action_text")
),
db.session.query(literal(TaskStatus.MAPPED.name).label("action_text")),
)
.subquery()
.alias("actions_table")
)
# Get only rows with the given actions.
filtered_actions = (
TaskHistory.query.with_entities(
TaskHistory.user_id,
TaskHistory.project_id,
TaskHistory.task_id,
TaskHistory.action_text,
)
.filter(TaskHistory.action_text.in_(actions))
.subquery()
.alias("filtered_actions")
)
user_tasks = (
db.session.query(filtered_actions)
.filter(filtered_actions.c.user_id == user.id)
.subquery()
.alias("user_tasks")
)
others_tasks = (
db.session.query(filtered_actions)
.filter(filtered_actions.c.user_id != user.id)
.filter(filtered_actions.c.task_id == user_tasks.c.task_id)
.filter(filtered_actions.c.project_id == user_tasks.c.project_id)
.filter(filtered_actions.c.action_text != TaskStatus.MAPPED.name)
.subquery()
.alias("others_tasks")
)
user_stats = (
db.session.query(
actions_table.c.action_text, func.count(user_tasks.c.action_text)
)
.outerjoin(
user_tasks, actions_table.c.action_text == user_tasks.c.action_text
)
.group_by(actions_table.c.action_text)
)
others_stats = (
db.session.query(
func.concat(actions_table.c.action_text, "_BY_OTHERS"),
func.count(others_tasks.c.action_text),
)
.outerjoin(
others_tasks, actions_table.c.action_text == others_tasks.c.action_text
)
.group_by(actions_table.c.action_text)
)
res = user_stats.union(others_stats).all()
results = {key: value for key, value in res}
projects_mapped = UserService.get_projects_mapped(user.id)
stats_dto.tasks_mapped = results["MAPPED"]
stats_dto.tasks_validated = results["VALIDATED"]
stats_dto.tasks_invalidated = results["INVALIDATED"]
stats_dto.tasks_validated_by_others = results["VALIDATED_BY_OTHERS"]
stats_dto.tasks_invalidated_by_others = results["INVALIDATED_BY_OTHERS"]
stats_dto.projects_mapped = len(projects_mapped)
stats_dto.countries_contributed = UserService.get_countries_contributed(user.id)
stats_dto.contributions_by_day = UserService.get_contributions_by_day(user.id)
stats_dto.total_time_spent = 0
stats_dto.time_spent_mapping = 0
stats_dto.time_spent_validating = 0
query = (
TaskHistory.query.with_entities(
func.date_trunc("minute", TaskHistory.action_date).label("trn"),
func.max(TaskHistory.action_text).label("tm"),
)
.filter(TaskHistory.user_id == user.id)
.filter(TaskHistory.action == "LOCKED_FOR_VALIDATION")
.group_by("trn")
.subquery()
)
total_validation_time = db.session.query(
func.sum(cast(func.to_timestamp(query.c.tm, "HH24:MI:SS"), Time))
).scalar()
if total_validation_time:
stats_dto.time_spent_validating = total_validation_time.total_seconds()
stats_dto.total_time_spent += stats_dto.time_spent_validating
total_mapping_time = (
db.session.query(
func.sum(
cast(func.to_timestamp(TaskHistory.action_text, "HH24:MI:SS"), Time)
)
)
.filter(
or_(
TaskHistory.action == TaskAction.LOCKED_FOR_MAPPING.name,
TaskHistory.action == TaskAction.AUTO_UNLOCKED_FOR_MAPPING.name,
)
)
.filter(TaskHistory.user_id == user.id)
.scalar()
)
if total_mapping_time:
stats_dto.time_spent_mapping = total_mapping_time.total_seconds()
stats_dto.total_time_spent += stats_dto.time_spent_mapping
stats_dto.contributions_interest = UserService.get_interests_stats(user.id)
return stats_dto
@staticmethod
def update_user_details(user_id: int, user_dto: UserDTO) -> dict:
"""Update user with info supplied by user, if they add or change their email address a verification mail
will be sent"""
user = UserService.get_user_by_id(user_id)
verification_email_sent = False
if (
user_dto.email_address
and user.email_address != user_dto.email_address.lower()
):
# Send user verification email if they are adding or changing their email address
SMTPService.send_verification_email(
user_dto.email_address.lower(), user.username
)
user.set_email_verified_status(is_verified=False)
verification_email_sent = True
user.update(user_dto)
user_email = UserEmail.query.filter(
UserEmail.email == user_dto.email_address
).one_or_none()
if user_email is not None:
user_email.delete()
return dict(verificationEmailSent=verification_email_sent)
@staticmethod
def get_all_users(query: UserSearchQuery) -> UserSearchDTO:
""" Gets paginated list of users """
return User.get_all_users(query)
@staticmethod
@cached(user_filter_cache)
def filter_users(username: str, project_id: int, page: int) -> UserFilterDTO:
""" Gets paginated list of users, filtered by username, for autocomplete """
return User.filter_users(username, project_id, page)
@staticmethod
def is_user_an_admin(user_id: int) -> bool:
""" Is the user an admin """
user = UserService.get_user_by_id(user_id)
if UserRole(user.role) == UserRole.ADMIN:
return True
return False
@staticmethod
def is_user_the_project_author(user_id: int, author_id: int) -> bool:
""" Is user the author of the project """
return user_id == author_id
@staticmethod
def get_mapping_level(user_id: int):
""" Gets mapping level user is at"""
user = UserService.get_user_by_id(user_id)
return MappingLevel(user.mapping_level)
@staticmethod
def is_user_validator(user_id: int) -> bool:
""" Determines if user is a validator """
user = UserService.get_user_by_id(user_id)
if UserRole(user.role) in [
UserRole.ADMIN,
]:
return True
return False
@staticmethod
def is_user_blocked(user_id: int) -> bool:
""" Determines if a user is blocked """
user = UserService.get_user_by_id(user_id)
if UserRole(user.role) == UserRole.READ_ONLY:
return True
return False
@staticmethod
def get_countries_contributed(user_id: int):
query = (
TaskHistory.query.with_entities(
func.unnest(Project.country).label("country"),
TaskHistory.action_text,
func.count(TaskHistory.action_text).label("count"),
)
.filter(TaskHistory.user_id == user_id)
.filter(
TaskHistory.action_text.in_(
[
TaskStatus.MAPPED.name,
TaskStatus.BADIMAGERY.name,
TaskStatus.VALIDATED.name,
]
)
)
.group_by("country", TaskHistory.action_text)
.outerjoin(Project, Project.id == TaskHistory.project_id)
.all()
)
countries = list(set([q.country for q in query]))
result = []
for country in countries:
values = [q for q in query if q.country == country]
# Filter element to sum mapped values.
mapped = sum(
[
v.count
for v in values
if v.action_text
in [TaskStatus.MAPPED.name, TaskStatus.BADIMAGERY.name]
]
)
validated = sum(
[v.count for v in values if v.action_text == TaskStatus.VALIDATED.name]
)
dto = UserCountryContributed(
dict(
name=country,
mapped=mapped,
validated=validated,
total=mapped + validated,
)
)
result.append(dto)
# Order by total
result = sorted(result, reverse=True, key=lambda i: i.total)
countries_dto = UserCountriesContributed()
countries_dto.countries_contributed = result
countries_dto.total = len(result)
return countries_dto
@staticmethod
def upsert_mapped_projects(user_id: int, project_id: int):
""" Add project to mapped projects if it doesn't exist, otherwise return """
User.upsert_mapped_projects(user_id, project_id)
@staticmethod
def get_mapped_projects(user_name: str, preferred_locale: str):
""" Gets all projects a user has mapped or validated on """
user = UserService.get_user_by_username(user_name)
return User.get_mapped_projects(user.id, preferred_locale)
@staticmethod
def get_recommended_projects(user_name: str, preferred_locale: str):
""" Gets all projects a user has mapped or validated on """
from backend.services.project_search_service import ProjectSearchService
limit = 20
user = (
User.query.with_entities(User.id, User.mapping_level)
.filter(User.username == user_name)
.one_or_none()
)
if user is None:
raise NotFound()
# Get all projects that the user has contributed
sq = (
TaskHistory.query.with_entities(TaskHistory.project_id.label("project_id"))
.distinct(TaskHistory.project_id)
.filter(TaskHistory.user_id == user.id)
.subquery()
)
# Get all campaigns for all contributed projects.
campaign_tags = (
Project.query.with_entities(Project.campaign.label("tag"))
.filter(or_(Project.author_id == user.id, Project.id == sq.c.project_id))
.subquery()
)
# Get projects with given campaign tags but without user contributions.
query = ProjectSearchService.create_search_query()
projs = (
query.filter(Project.campaign.any(campaign_tags.c.tag)).limit(limit).all()
)
# Get only user mapping level projects.
len_projs = len(projs)
if len_projs < limit:
remaining_projs = (
query.filter(Project.mapper_level == user.mapping_level)
.limit(limit - len_projs)
.all()
)
projs.extend(remaining_projs)
dto = ProjectSearchResultsDTO()
# Get all total contributions for each paginated project.
contrib_counts = ProjectSearchService.get_total_contributions(projs)
zip_items = zip(projs, contrib_counts)
dto.results = [
ProjectSearchService.create_result_dto(p, "en", t) for p, t in zip_items
]
return dto
@staticmethod
def add_role_to_user(admin_user_id: int, username: str, role: str):
"""
Add role to user
:param admin_user_id: ID of admin attempting to add the role
:param username: Username of user the role should be added to
:param role: The requested role
:raises UserServiceError
"""
try:
requested_role = UserRole[role.upper()]
except KeyError:
raise UserServiceError(
f"Unknown role {role} accepted values are ADMIN, PROJECT_MANAGER, VALIDATOR"
)
admin = UserService.get_user_by_id(admin_user_id)
admin_role = UserRole(admin.role)
if admin_role != UserRole.ADMIN and requested_role == UserRole.ADMIN:
raise UserServiceError("You must be an Admin to assign Admin role")
user = UserService.get_user_by_username(username)
user.set_user_role(requested_role)
@staticmethod
def set_user_mapping_level(username: str, level: str) -> User:
"""
Sets the users mapping level
:raises: UserServiceError
"""
try:
requested_level = MappingLevel[level.upper()]
except KeyError:
raise UserServiceError(
f"Unknown role {level} accepted values are BEGINNER, INTERMEDIATE, ADVANCED"
)
user = UserService.get_user_by_username(username)
user.set_mapping_level(requested_level)
return user
@staticmethod
def set_user_is_expert(user_id: int, is_expert: bool) -> User:
"""
Enabled or disables expert mode for the user
:raises: UserServiceError
"""
user = UserService.get_user_by_id(user_id)
user.set_is_expert(is_expert)
return user
@staticmethod
def accept_license_terms(user_id: int, license_id: int):
""" Saves the fact user has accepted license terms """
user = UserService.get_user_by_id(user_id)
user.accept_license_terms(license_id)
@staticmethod
def has_user_accepted_license(user_id: int, license_id: int):
""" Checks if user has accepted specified license """
user = UserService.get_user_by_id(user_id)
return user.has_user_accepted_licence(license_id)
@staticmethod
def get_osm_details_for_user(username: str) -> UserOSMDTO:
"""
Gets OSM details for the user from OSM API
:param username: username in scope
:raises UserServiceError, NotFound
"""
user = UserService.get_user_by_username(username)
osm_dto = OSMService.get_osm_details_for_user(user.id)
return osm_dto
@staticmethod
def check_and_update_mapper_level(user_id: int):
""" Check users mapping level and update if they have crossed threshold """
user = UserService.get_user_by_id(user_id)
user_level = MappingLevel(user.mapping_level)
if user_level == MappingLevel.ADVANCED:
return # User has achieved highest level, so no need to do further checking
intermediate_level = current_app.config["MAPPER_LEVEL_INTERMEDIATE"]
advanced_level = current_app.config["MAPPER_LEVEL_ADVANCED"]
try:
osm_details = OSMService.get_osm_details_for_user(user_id)
if (
osm_details.changeset_count > advanced_level
and user.mapping_level != MappingLevel.ADVANCED.value
):
user.mapping_level = MappingLevel.ADVANCED.value
UserService.notify_level_upgrade(user_id, user.username, "ADVANCED")
elif (
intermediate_level < osm_details.changeset_count < advanced_level
and user.mapping_level != MappingLevel.INTERMEDIATE.value
):
user.mapping_level = MappingLevel.INTERMEDIATE.value
UserService.notify_level_upgrade(user_id, user.username, "INTERMEDIATE")
except OSMServiceError:
# Swallow exception as we don't want to blow up the server for this
current_app.logger.error("Error attempting to update mapper level")
return
user.save()
@staticmethod
def notify_level_upgrade(user_id: int, username: str, level: str):
text_template = get_txt_template("level_upgrade_message_en.txt")
replace_list = [
["[USERNAME]", username],
["[LEVEL]", level],
["[ORG_CODE]", current_app.config["ORG_CODE"]],
]
text_template = template_var_replacing(text_template, replace_list)
level_upgrade_message = Message()
level_upgrade_message.to_user_id = user_id
level_upgrade_message.subject = "Mapper level upgrade"
level_upgrade_message.message = text_template
level_upgrade_message.save()
@staticmethod
def refresh_mapper_level() -> int:
""" Helper function to run thru all users in the DB and update their mapper level """
users = User.get_all_users_not_paginated()
users_updated = 1
total_users = len(users)
for user in users:
UserService.check_and_update_mapper_level(user.id)
if users_updated % 50 == 0:
print(f"{users_updated} users updated of {total_users}")
users_updated += 1
return users_updated
@staticmethod
def register_user_with_email(user_dto: UserRegisterEmailDTO):
# Validate that user is not within the general users table.
user = User.query.filter(User.email_address == user_dto.email).one_or_none()
if user is not None:
details_msg = f"Email address {user_dto.email} already exists"
raise ValueError(details_msg)
user = UserEmail.query.filter(UserEmail.email == user_dto.email).one_or_none()
if user is None:
user = UserEmail(email=user_dto.email)
user.create()
return user
@staticmethod
def get_interests(user: User) -> InterestsListDTO:
dto = InterestsListDTO()
for interest in Interest.query.all():
int_dto = interest.as_dto()
if interest in user.interests:
int_dto.user_selected = True
dto.interests.append(int_dto)
return dto