backend/services/project_search_service.py (449 lines of code) (raw):
from flask import current_app
import math
import geojson
from geoalchemy2 import shape
from sqlalchemy import func, distinct, desc, or_, and_
from shapely.geometry import Polygon, box
from cachetools import TTLCache, cached
from backend import db
from backend.api.utils import validate_date_input
from backend.models.dtos.project_dto import (
ProjectSearchDTO,
ProjectSearchResultsDTO,
ListSearchResultDTO,
Pagination,
ProjectSearchBBoxDTO,
)
from backend.models.postgis.project import Project, ProjectInfo, ProjectTeams
from backend.models.postgis.statuses import (
ProjectStatus,
MappingLevel,
MappingTypes,
ProjectPriority,
UserRole,
TeamRoles,
ValidationPermission,
MappingPermission,
)
from backend.models.postgis.campaign import Campaign
from backend.models.postgis.organisation import Organisation
from backend.models.postgis.task import TaskHistory
from backend.models.postgis.utils import (
NotFound,
ST_Intersects,
ST_MakeEnvelope,
ST_Transform,
ST_Area,
)
from backend.models.postgis.interests import project_interests
from backend.services.users.user_service import UserService
search_cache = TTLCache(maxsize=128, ttl=300)
# max area allowed for passed in bbox, calculation shown to help future maintenance
# client resolution (mpp)* arbitrary large map size on a large screen in pixels * 50% buffer, all squared
MAX_AREA = math.pow(1250 * 4275 * 1.5, 2)
class ProjectSearchServiceError(Exception):
""" Custom Exception to notify callers an error occurred when handling mapping """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class BBoxTooBigError(Exception):
""" Custom Exception to notify callers an error occurred when handling mapping """
def __init__(self, message):
if current_app:
current_app.logger.debug(message)
class ProjectSearchService:
@staticmethod
def create_search_query(user=None):
query = (
db.session.query(
Project.id.label("id"),
Project.mapper_level,
Project.priority,
Project.default_locale,
Project.centroid.ST_AsGeoJSON().label("centroid"),
Project.organisation_id,
Project.tasks_bad_imagery,
Project.tasks_mapped,
Project.tasks_validated,
Project.status,
Project.total_tasks,
Project.last_updated,
Project.due_date,
Project.country,
Organisation.name.label("organisation_name"),
Organisation.logo.label("organisation_logo"),
)
.filter(Project.geometry is not None)
.outerjoin(Organisation, Organisation.id == Project.organisation_id)
.group_by(Organisation.id, Project.id)
)
# Get public projects only for anonymous user.
if user is None:
query = query.filter(Project.private.is_(False))
if user is not None and user.role != UserRole.ADMIN.value:
# Get also private projects of teams that the user is member.
project_ids = [[p.project_id for p in t.team.projects] for t in user.teams]
# Get projects that belong to user organizations.
orgs_projects_ids = [[p.id for p in u.projects] for u in user.organisations]
project_ids.extend(orgs_projects_ids)
project_ids = tuple(
set([item for sublist in project_ids for item in sublist])
)
query = query.filter(
or_(Project.private.is_(False), Project.id.in_(project_ids))
)
# If the user is admin, no filter.
return query
@staticmethod
def create_result_dto(project, preferred_locale, total_contributors):
project_info_dto = ProjectInfo.get_dto_for_locale(
project.id, preferred_locale, project.default_locale
)
list_dto = ListSearchResultDTO()
list_dto.project_id = project.id
list_dto.locale = project_info_dto.locale
list_dto.name = project_info_dto.name
list_dto.priority = ProjectPriority(project.priority).name
list_dto.mapper_level = MappingLevel(project.mapper_level).name
list_dto.short_description = project_info_dto.short_description
list_dto.last_updated = project.last_updated
list_dto.due_date = project.due_date
list_dto.percent_mapped = Project.calculate_tasks_percent(
"mapped",
project.total_tasks,
project.tasks_mapped,
project.tasks_validated,
project.tasks_bad_imagery,
)
list_dto.percent_validated = Project.calculate_tasks_percent(
"validated",
project.total_tasks,
project.tasks_mapped,
project.tasks_validated,
project.tasks_bad_imagery,
)
list_dto.status = ProjectStatus(project.status).name
list_dto.active_mappers = Project.get_active_mappers(project.id)
list_dto.total_contributors = total_contributors
list_dto.country = project.country
list_dto.organisation_name = project.organisation_name
list_dto.organisation_logo = project.organisation_logo
list_dto.campaigns = Project.get_project_campaigns(project.id)
return list_dto
@staticmethod
def get_total_contributions(paginated_results):
paginated_projects_ids = [p.id for p in paginated_results]
# We need to make a join to return projects without contributors.
project_contributors_count = (
Project.query.with_entities(
Project.id, func.count(distinct(TaskHistory.user_id)).label("total")
)
.filter(Project.id.in_(paginated_projects_ids))
.outerjoin(
TaskHistory,
and_(
TaskHistory.project_id == Project.id,
TaskHistory.action != "COMMENT",
),
)
.group_by(Project.id)
.all()
)
return [p.total for p in project_contributors_count]
@staticmethod
@cached(search_cache)
def search_projects(search_dto: ProjectSearchDTO, user) -> ProjectSearchResultsDTO:
""" Searches all projects for matches to the criteria provided by the user """
all_results, paginated_results = ProjectSearchService._filter_projects(
search_dto, user
)
if paginated_results.total == 0:
raise NotFound()
dto = ProjectSearchResultsDTO()
dto.results = [
ProjectSearchService.create_result_dto(
p,
search_dto.preferred_locale,
Project.get_project_total_contributions(p[0]),
)
for p in paginated_results.items
]
dto.pagination = Pagination(paginated_results)
if search_dto.omit_map_results:
return dto
features = []
for project in all_results:
# This loop creates a geojson feature collection so you can see all active projects on the map
properties = {
"projectId": project.id,
"priority": ProjectPriority(project.priority).name,
}
# centroid = project.centroid
feature = geojson.Feature(
geometry=geojson.loads(project.centroid), properties=properties
)
features.append(feature)
feature_collection = geojson.FeatureCollection(features)
dto.map_results = feature_collection
return dto
@staticmethod
def _filter_projects(search_dto: ProjectSearchDTO, user):
""" Filters all projects based on criteria provided by user"""
query = ProjectSearchService.create_search_query(user)
query = query.join(ProjectInfo).filter(
ProjectInfo.locale.in_([search_dto.preferred_locale, "en"])
)
project_status_array = []
if search_dto.project_statuses:
project_status_array = [
ProjectStatus[project_status].value
for project_status in search_dto.project_statuses
]
query = query.filter(Project.status.in_(project_status_array))
else:
if not search_dto.created_by:
project_status_array = [ProjectStatus.PUBLISHED.value]
query = query.filter(Project.status.in_(project_status_array))
if search_dto.interests:
query = query.join(
project_interests, project_interests.c.project_id == Project.id
).filter(project_interests.c.interest_id.in_(search_dto.interests))
if search_dto.created_by:
query = query.filter(Project.author_id == search_dto.created_by)
if search_dto.mapped_by:
projects_mapped = UserService.get_projects_mapped(search_dto.mapped_by)
query = query.filter(Project.id.in_(projects_mapped))
if search_dto.favorited_by:
projects_favorited = user.favorites
query = query.filter(
Project.id.in_([project.id for project in projects_favorited])
)
if search_dto.mapper_level and search_dto.mapper_level.upper() != "ALL":
query = query.filter(
Project.mapper_level == MappingLevel[search_dto.mapper_level].value
)
if search_dto.action and search_dto.action != "any":
if search_dto.action == "map":
query = ProjectSearchService.filter_projects_to_map(query, user)
if search_dto.action == "validate":
query = ProjectSearchService.filter_projects_to_validate(query, user)
if search_dto.organisation_name:
query = query.filter(Organisation.name == search_dto.organisation_name)
if search_dto.organisation_id:
query = query.filter(Organisation.id == search_dto.organisation_id)
if search_dto.team_id:
query = query.join(
ProjectTeams, ProjectTeams.project_id == Project.id
).filter(ProjectTeams.team_id == search_dto.team_id)
if search_dto.campaign:
query = query.join(Campaign, Project.campaign).group_by(Campaign.name)
query = query.filter(Campaign.name == search_dto.campaign)
if search_dto.mapping_types:
# Construct array of mapping types for query
mapping_type_array = []
if search_dto.mapping_types_exact:
mapping_type_array = [
{
MappingTypes[mapping_type].value
for mapping_type in search_dto.mapping_types
}
]
query = query.filter(Project.mapping_types.in_(mapping_type_array))
else:
mapping_type_array = [
MappingTypes[mapping_type].value
for mapping_type in search_dto.mapping_types
]
query = query.filter(Project.mapping_types.overlap(mapping_type_array))
if search_dto.text_search:
# We construct an OR search, so any projects that contain or more of the search terms should be returned
or_search = " | ".join(
[x for x in search_dto.text_search.split(" ") if x != ""]
)
opts = [
ProjectInfo.text_searchable.match(
or_search, postgresql_regconfig="english"
),
ProjectInfo.name.ilike(f"%{or_search}%"),
]
try:
opts.append(Project.id == int(search_dto.text_search))
except ValueError:
pass
query = query.filter(or_(*opts))
if search_dto.country:
# Unnest country column array.
sq = Project.query.with_entities(
Project.id, func.unnest(Project.country).label("country")
).subquery()
query = query.filter(
sq.c.country.ilike("%{}%".format(search_dto.country))
).filter(Project.id == sq.c.id)
if search_dto.last_updated_gte:
last_updated_gte = validate_date_input(search_dto.last_updated_gte)
query = query.filter(Project.last_updated >= last_updated_gte)
if search_dto.last_updated_lte:
last_updated_lte = validate_date_input(search_dto.last_updated_lte)
query = query.filter(Project.last_updated <= last_updated_lte)
if search_dto.created_gte:
created_gte = validate_date_input(search_dto.created_gte)
query = query.filter(Project.created >= created_gte)
if search_dto.created_lte:
created_lte = validate_date_input(search_dto.created_lte)
query = query.filter(Project.created <= created_lte)
order_by = search_dto.order_by
if search_dto.order_by_type == "DESC":
order_by = desc(search_dto.order_by)
query = query.order_by(order_by).distinct(search_dto.order_by, Project.id)
if search_dto.managed_by and user.role != UserRole.ADMIN.value:
# Get all the projects associated with the user and team.
orgs_projects_ids = [[p.id for p in u.projects] for u in user.organisations]
orgs_projects_ids = [
item for sublist in orgs_projects_ids for item in sublist
]
team_project_ids = [
[
p.project_id
for p in u.team.projects
if p.role == TeamRoles.PROJECT_MANAGER.value
]
for u in user.teams
]
team_project_ids = [
item for sublist in team_project_ids for item in sublist
]
orgs_projects_ids.extend(team_project_ids)
ids = tuple(set(orgs_projects_ids))
query = query.filter(Project.id.in_(ids))
all_results = []
if not search_dto.omit_map_results:
query_result = query
query_result.column_descriptions.clear()
query_result.add_column(Project.id)
query_result.add_column(Project.centroid.ST_AsGeoJSON().label("centroid"))
query_result.add_column(Project.priority)
all_results = query_result.all()
paginated_results = query.paginate(search_dto.page, 14, True)
return all_results, paginated_results
@staticmethod
def filter_by_user_permission(query, user, permission: str):
"""Filter projects a user can map or validate, based on their permissions."""
if user and user.role != UserRole.ADMIN.value:
if permission == "validation_permission":
permission_class = ValidationPermission
team_roles = [
TeamRoles.VALIDATOR.value,
TeamRoles.PROJECT_MANAGER.value,
]
else:
permission_class = MappingPermission
team_roles = [
TeamRoles.MAPPER.value,
TeamRoles.VALIDATOR.value,
TeamRoles.PROJECT_MANAGER.value,
]
selection = []
# get ids of projects assigned to the user's teams
[
[
selection.append(team_project.project_id)
for team_project in user_team.team.projects
if team_project.project_id not in selection
and team_project.role in team_roles
]
for user_team in user.teams
]
if user.mapping_level == MappingLevel.BEGINNER.value:
# if user is beginner, get only projects with ANY or TEAMS mapping permission
# in the later case, only those that are associated with user teams
query = query.filter(
or_(
and_(
Project.id.in_(selection),
getattr(Project, permission)
== permission_class.TEAMS.value,
),
getattr(Project, permission) == permission_class.ANY.value,
)
)
else:
# if user is intermediate or advanced, get projects with ANY or LEVEL permission
# and projects associated with user teams
query = query.filter(
or_(
Project.id.in_(selection),
getattr(Project, permission).in_(
[
permission_class.ANY.value,
permission_class.LEVEL.value,
]
),
)
)
return query
@staticmethod
def filter_projects_to_map(query, user):
"""Filter projects that needs mapping and can be mapped by the current user."""
query = query.filter(
Project.tasks_mapped + Project.tasks_validated
< Project.total_tasks - Project.tasks_bad_imagery
)
return ProjectSearchService.filter_by_user_permission(
query, user, "mapping_permission"
)
@staticmethod
def filter_projects_to_validate(query, user):
"""Filter projects that needs validation and can be validated by the current user."""
query = query.filter(
Project.tasks_validated < Project.total_tasks - Project.tasks_bad_imagery
)
return ProjectSearchService.filter_by_user_permission(
query, user, "validation_permission"
)
@staticmethod
def get_projects_geojson(
search_bbox_dto: ProjectSearchBBoxDTO,
) -> geojson.FeatureCollection:
"""Search for projects meeting the provided criteria. Returns a GeoJSON feature collection."""
# make a polygon from provided bounding box
polygon = ProjectSearchService._make_4326_polygon_from_bbox(
search_bbox_dto.bbox, search_bbox_dto.input_srid
)
# validate the bbox area is less than or equal to the max area allowed to prevent
# abuse of the api or performance issues from large requests
if not ProjectSearchService.validate_bbox_area(polygon):
raise BBoxTooBigError("Requested bounding box is too large")
# get projects intersecting the polygon for created by the author_id
intersecting_projects = ProjectSearchService._get_intersecting_projects(
polygon, search_bbox_dto.project_author
)
# allow an empty feature collection to be returned if no intersecting features found, since this is primarily
# for returning data to show on a map
features = []
for project in intersecting_projects:
try:
localDTO = ProjectInfo.get_dto_for_locale(
project.id, search_bbox_dto.preferred_locale, project.default_locale
)
except Exception:
pass
properties = {
"projectId": project.id,
"projectStatus": ProjectStatus(project.status).name,
"projectName": localDTO.name,
}
feature = geojson.Feature(
geometry=geojson.loads(project.geometry), properties=properties
)
features.append(feature)
return geojson.FeatureCollection(features)
@staticmethod
def _get_intersecting_projects(search_polygon: Polygon, author_id: int):
"""Executes a database query to get the intersecting projects created by the author if provided """
query = db.session.query(
Project.id,
Project.status,
Project.default_locale,
Project.geometry.ST_AsGeoJSON().label("geometry"),
).filter(
ST_Intersects(
Project.geometry,
ST_MakeEnvelope(
search_polygon.bounds[0],
search_polygon.bounds[1],
search_polygon.bounds[2],
search_polygon.bounds[3],
4326,
),
)
)
if author_id:
query = query.filter(Project.author_id == author_id)
return query.all()
@staticmethod
def _make_4326_polygon_from_bbox(bbox: list, srid: int) -> Polygon:
""" make a shapely Polygon in SRID 4326 from bbox and srid"""
try:
polygon = box(bbox[0], bbox[1], bbox[2], bbox[3])
if not srid == 4326:
geometry = shape.from_shape(polygon, srid)
geom_4326 = db.engine.execute(ST_Transform(geometry, 4326)).scalar()
polygon = shape.to_shape(geom_4326)
except Exception as e:
raise ProjectSearchServiceError(f"error making polygon: {e}")
return polygon
@staticmethod
def _get_area_sqm(polygon: Polygon) -> float:
""" get the area of the polygon in square metres """
return db.engine.execute(
ST_Area(ST_Transform(shape.from_shape(polygon, 4326), 3857))
).scalar()
@staticmethod
def validate_bbox_area(polygon: Polygon) -> bool:
""" check polygon does not exceed maximim allowed area"""
area = ProjectSearchService._get_area_sqm(polygon)
return area <= MAX_AREA