backend/services/stats_service.py (585 lines of code) (raw):
from cachetools import TTLCache, cached
from datetime import date, timedelta
from sqlalchemy import func, desc, cast, extract, or_, tuple_
from sqlalchemy.sql.functions import coalesce
from sqlalchemy.types import Time
from backend import db
from backend.models.dtos.stats_dto import (
ProjectContributionsDTO,
UserContribution,
Pagination,
TaskHistoryDTO,
TaskStatusDTO,
ProjectActivityDTO,
ProjectLastActivityDTO,
HomePageStatsDTO,
OrganizationListStatsDTO,
CampaignStatsDTO,
TaskStats,
TaskStatsDTO,
GenderStatsDTO,
UserStatsDTO,
)
from backend.models.dtos.project_dto import ProjectSearchResultsDTO
from backend.models.postgis.campaign import Campaign, campaign_projects
from backend.models.postgis.organisation import Organisation
from backend.models.postgis.project import Project
from backend.models.postgis.statuses import TaskStatus, MappingLevel, UserGender
from backend.models.postgis.task import TaskHistory, User, Task, TaskAction
from backend.models.postgis.utils import timestamp, NotFound # noqa: F401
from backend.services.project_service import ProjectService
from backend.services.project_search_service import ProjectSearchService
from backend.services.users.user_service import UserService
from backend.services.organisation_service import OrganisationService
from backend.services.campaign_service import CampaignService
homepage_stats_cache = TTLCache(maxsize=4, ttl=30)
class StatsService:
@staticmethod
def update_stats_after_task_state_change(
project_id: int,
user_id: int,
last_state: TaskStatus,
new_state: TaskStatus,
action="change",
):
""" Update stats when a task has had a state change """
if new_state in [
TaskStatus.LOCKED_FOR_VALIDATION,
TaskStatus.LOCKED_FOR_MAPPING,
]:
return # No stats to record for these states
project = ProjectService.get_project_by_id(project_id)
user = UserService.get_user_by_id(user_id)
project, user = StatsService._update_tasks_stats(
project, user, last_state, new_state, action
)
UserService.upsert_mapped_projects(user_id, project_id)
project.last_updated = timestamp()
# Transaction will be saved when task is saved
return project, user
@staticmethod
def _update_tasks_stats(
project: Project,
user: User,
last_state: TaskStatus,
new_state: TaskStatus,
action="change",
):
# Make sure you are aware that users table has it as incrementing counters,
# while projects table reflect the actual state, and both increment and decrement happens
if new_state == last_state:
return project, user
# Set counters for new state
if new_state == TaskStatus.MAPPED:
project.tasks_mapped += 1
elif new_state == TaskStatus.VALIDATED:
project.tasks_validated += 1
elif new_state == TaskStatus.BADIMAGERY:
project.tasks_bad_imagery += 1
if action == "change":
if new_state == TaskStatus.MAPPED:
user.tasks_mapped += 1
elif new_state == TaskStatus.VALIDATED:
user.tasks_validated += 1
elif new_state == TaskStatus.INVALIDATED:
user.tasks_invalidated += 1
# Remove counters for old state
if last_state == TaskStatus.MAPPED:
project.tasks_mapped -= 1
elif last_state == TaskStatus.VALIDATED:
project.tasks_validated -= 1
elif last_state == TaskStatus.BADIMAGERY:
project.tasks_bad_imagery -= 1
if action == "undo":
if last_state == TaskStatus.MAPPED:
user.tasks_mapped -= 1
elif last_state == TaskStatus.VALIDATED:
user.tasks_validated -= 1
elif last_state == TaskStatus.INVALIDATED:
user.tasks_invalidated -= 1
return project, user
@staticmethod
def get_latest_activity(project_id: int, page: int) -> ProjectActivityDTO:
""" Gets all the activity on a project """
if not ProjectService.exists(project_id):
raise NotFound
results = (
db.session.query(
TaskHistory.id,
TaskHistory.task_id,
TaskHistory.action,
TaskHistory.action_date,
TaskHistory.action_text,
User.username,
)
.join(User)
.filter(
TaskHistory.project_id == project_id,
TaskHistory.action != TaskAction.COMMENT.name,
)
.order_by(TaskHistory.action_date.desc())
.paginate(page, 10, True)
)
activity_dto = ProjectActivityDTO()
for item in results.items:
history = TaskHistoryDTO()
history.task_id = item.id
history.task_id = item.task_id
history.action = item.action
history.action_text = item.action_text
history.action_date = item.action_date
history.action_by = item.username
activity_dto.activity.append(history)
activity_dto.pagination = Pagination(results)
return activity_dto
@staticmethod
def get_popular_projects() -> ProjectSearchResultsDTO:
""" Get all projects ordered by task_history """
rate_func = func.count(TaskHistory.user_id) / extract(
"epoch", func.sum(cast(TaskHistory.action_date, Time))
)
query = (
TaskHistory.query.with_entities(
TaskHistory.project_id.label("id"), rate_func.label("rate")
)
.filter(TaskHistory.action_date >= date.today() - timedelta(days=90))
.filter(
or_(
TaskHistory.action == TaskAction.LOCKED_FOR_MAPPING.name,
TaskHistory.action == TaskAction.LOCKED_FOR_VALIDATION.name,
)
)
.filter(TaskHistory.action_text is not None)
.filter(TaskHistory.action_text != "")
.group_by(TaskHistory.project_id)
.order_by(desc("rate"))
.limit(10)
.subquery()
)
projects_query = ProjectSearchService.create_search_query()
projects = projects_query.filter(Project.id == query.c.id).all()
# Get total contributors.
contrib_counts = ProjectSearchService.get_total_contributions(projects)
zip_items = zip(projects, contrib_counts)
dto = ProjectSearchResultsDTO()
dto.results = [
ProjectSearchService.create_result_dto(p, "en", t) for p, t in zip_items
]
return dto
@staticmethod
def get_last_activity(project_id: int) -> ProjectLastActivityDTO:
""" Gets the last activity for a project's tasks """
sq = (
TaskHistory.query.with_entities(
TaskHistory.task_id,
TaskHistory.action_date,
TaskHistory.user_id,
)
.filter(TaskHistory.project_id == project_id)
.filter(TaskHistory.action != TaskAction.COMMENT.name)
.order_by(TaskHistory.task_id, TaskHistory.action_date.desc())
.distinct(TaskHistory.task_id)
.subquery()
)
sq_statuses = (
Task.query.with_entities(Task.id, Task.task_status)
.filter(Task.project_id == project_id)
.subquery()
)
results = (
db.session.query(
sq_statuses.c.id,
sq.c.action_date,
sq_statuses.c.task_status,
User.username,
)
.outerjoin(sq, sq.c.task_id == sq_statuses.c.id)
.outerjoin(User, User.id == sq.c.user_id)
.order_by(sq_statuses.c.id)
.all()
)
dto = ProjectLastActivityDTO()
dto.activity = [
TaskStatusDTO(
dict(
task_id=r.id,
task_status=TaskStatus(r.task_status).name,
action_date=r.action_date,
action_by=r.username,
)
)
for r in results
]
return dto
@staticmethod
def get_user_contributions(project_id: int) -> ProjectContributionsDTO:
""" Get all user contributions on a project"""
mapped_stmt = (
Task.query.with_entities(
Task.mapped_by,
func.count(Task.mapped_by).label("count"),
func.array_agg(Task.id).label("task_ids"),
)
.filter(Task.project_id == project_id)
.group_by(Task.mapped_by)
.subquery()
)
validated_stmt = (
Task.query.with_entities(
Task.validated_by,
func.count(Task.validated_by).label("count"),
func.array_agg(Task.id).label("task_ids"),
)
.filter(Task.project_id == project_id)
.group_by(Task.validated_by)
.subquery()
)
results = (
db.session.query(
User.id,
User.username,
User.name,
User.mapping_level,
User.picture_url,
User.date_registered,
coalesce(mapped_stmt.c.count, 0).label("mapped"),
coalesce(validated_stmt.c.count, 0).label("validated"),
(
coalesce(mapped_stmt.c.count, 0)
+ coalesce(validated_stmt.c.count, 0)
).label("total"),
mapped_stmt.c.task_ids.label("mapped_tasks"),
validated_stmt.c.task_ids.label("validated_tasks"),
)
.outerjoin(
validated_stmt,
mapped_stmt.c.mapped_by == validated_stmt.c.validated_by,
full=True,
)
.join(
User,
or_(
User.id == mapped_stmt.c.mapped_by,
User.id == validated_stmt.c.validated_by,
),
)
.order_by(desc("total"))
.all()
)
contrib_dto = ProjectContributionsDTO()
user_contributions = [
UserContribution(
dict(
username=r.username,
name=r.name,
mapping_level=MappingLevel(r.mapping_level).name,
picture_url=r.picture_url,
mapped=r.mapped,
validated=r.validated,
total=r.total,
mapped_tasks=r.mapped_tasks if r.mapped_tasks is not None else [],
validated_tasks=r.validated_tasks
if r.validated_tasks is not None
else [],
date_registered=r.date_registered.date(),
)
)
for r in results
]
contrib_dto.user_contributions = user_contributions
return contrib_dto
@staticmethod
@cached(homepage_stats_cache)
def get_homepage_stats(abbrev=True) -> HomePageStatsDTO:
""" Get overall TM stats to give community a feel for progress that's being made """
dto = HomePageStatsDTO()
dto.total_projects = Project.query.with_entities(
func.count(Project.id)
).scalar()
dto.mappers_online = (
Task.query.with_entities(func.count(Task.locked_by.distinct()))
.filter(Task.locked_by.isnot(None))
.scalar()
)
dto.total_mappers = User.query.with_entities(func.count(User.id)).scalar()
dto.tasks_mapped = (
Task.query.with_entities(func.count())
.filter(
Task.task_status.in_(
(TaskStatus.MAPPED.value, TaskStatus.VALIDATED.value)
)
)
.scalar()
)
if not abbrev:
dto.total_validators = (
Task.query.filter(Task.task_status == TaskStatus.VALIDATED.value)
.distinct(Task.validated_by)
.count()
)
dto.tasks_validated = Task.query.filter(
Task.task_status == TaskStatus.VALIDATED.value
).count()
dto.total_area = Project.query.with_entities(
func.coalesce(func.sum(func.ST_Area(Project.geometry, True) / 1000000))
).scalar()
dto.total_mapped_area = (
Task.query.with_entities(
func.coalesce(func.sum(func.ST_Area(Task.geometry, True) / 1000000))
)
.filter(Task.task_status == TaskStatus.MAPPED.value)
.scalar()
)
dto.total_validated_area = (
Task.query.with_entities(
func.coalesce(func.sum(func.ST_Area(Task.geometry, True) / 1000000))
)
.filter(Task.task_status == TaskStatus.VALIDATED.value)
.scalar()
)
unique_campaigns = Campaign.query.with_entities(
func.count(Campaign.id)
).scalar()
linked_campaigns_count = (
Campaign.query.join(
campaign_projects, Campaign.id == campaign_projects.c.campaign_id
)
.with_entities(
Campaign.name, func.count(campaign_projects.c.campaign_id)
)
.group_by(Campaign.id)
.all()
)
subquery = (
db.session.query(campaign_projects.c.project_id.distinct())
.order_by(campaign_projects.c.project_id)
.subquery()
)
no_campaign_count = (
Project.query.with_entities(func.count())
.filter(~Project.id.in_(subquery))
.scalar()
)
dto.campaigns = [CampaignStatsDTO(row) for row in linked_campaigns_count]
if no_campaign_count:
dto.campaigns.append(
CampaignStatsDTO(("Unassociated", no_campaign_count))
)
dto.total_campaigns = unique_campaigns
unique_orgs = Organisation.query.with_entities(
func.count(Organisation.id)
).scalar()
linked_orgs_count = (
db.session.query(Organisation.name, func.count(Project.organisation_id))
.join(Project.organisation)
.group_by(Organisation.id)
.all()
)
subquery = (
db.session.query(Project.organisation_id.distinct())
.order_by(Project.organisation_id)
.subquery()
)
no_org_project_count = (
Organisation.query.with_entities(func.count())
.filter(~Organisation.id.in_(subquery))
.scalar()
)
dto.organisations = [
OrganizationListStatsDTO(row) for row in linked_orgs_count
]
if no_org_project_count:
no_org_proj = OrganizationListStatsDTO(
("Unassociated", no_org_project_count)
)
dto.organisations.append(no_org_proj)
dto.total_organisations = unique_orgs
else:
# Clear null attributes for abbreviated call
clear_attrs = [
"total_validators",
"tasks_validated",
"total_area",
"total_mapped_area",
"total_validated_area",
"campaigns",
"total_campaigns",
"organisations",
"total_organisations",
]
for attr in clear_attrs:
delattr(dto, attr)
return dto
@staticmethod
def update_all_project_stats():
projects = db.session.query(Project.id)
for project_id in projects.all():
StatsService.update_project_stats(project_id)
@staticmethod
def update_project_stats(project_id: int):
project = ProjectService.get_project_by_id(project_id)
tasks = Task.query.filter(Task.project_id == project_id)
project.total_tasks = tasks.count()
project.tasks_mapped = tasks.filter(
Task.task_status == TaskStatus.MAPPED.value
).count()
project.tasks_validated = tasks.filter(
Task.task_status == TaskStatus.VALIDATED.value
).count()
project.tasks_bad_imagery = tasks.filter(
Task.task_status == TaskStatus.BADIMAGERY.value
).count()
project.save()
@staticmethod
def get_all_users_statistics(start_date: date, end_date: date):
users = User.query.filter(
User.date_registered >= start_date,
User.date_registered <= end_date,
)
stats_dto = UserStatsDTO()
stats_dto.total = users.count()
stats_dto.beginner = users.filter(
User.mapping_level == MappingLevel.BEGINNER.value
).count()
stats_dto.intermediate = users.filter(
User.mapping_level == MappingLevel.INTERMEDIATE.value
).count()
stats_dto.advanced = users.filter(
User.mapping_level == MappingLevel.ADVANCED.value
).count()
stats_dto.contributed = users.filter(User.projects_mapped.isnot(None)).count()
stats_dto.email_verified = users.filter(
User.is_email_verified.is_(True)
).count()
gender_stats = GenderStatsDTO()
gender_stats.male = users.filter(User.gender == UserGender.MALE.value).count()
gender_stats.female = users.filter(
User.gender == UserGender.FEMALE.value
).count()
gender_stats.self_describe = users.filter(
User.gender == UserGender.SELF_DESCRIBE.value
).count()
gender_stats.prefer_not = users.filter(
User.gender == UserGender.PREFER_NOT.value
).count()
stats_dto.genders = gender_stats
return stats_dto
@staticmethod
def set_task_stats(result_row):
date_dto = TaskStats(
{
"date": result_row[0],
"mapped": result_row[1],
"validated": result_row[2],
"bad_imagery": result_row[3],
}
)
return date_dto
@staticmethod
def get_task_stats(
start_date, end_date, org_id, org_name, campaign, project_id, country
):
""" Creates tasks stats for a period using the TaskStatsDTO """
query = (
db.session.query(
TaskHistory.task_id,
TaskHistory.project_id,
TaskHistory.action_text,
func.DATE(TaskHistory.action_date).label("day"),
)
.distinct(
tuple_(
TaskHistory.project_id, TaskHistory.task_id, TaskHistory.action_text
)
)
.filter(
TaskHistory.action == "STATE_CHANGE",
or_(
TaskHistory.action_text == "MAPPED",
TaskHistory.action_text == "VALIDATED",
TaskHistory.action_text == "BADIMAGERY",
),
)
.order_by(
TaskHistory.project_id,
TaskHistory.task_id,
TaskHistory.action_text,
TaskHistory.action_date,
)
)
if org_id:
query = query.join(Project, Project.id == TaskHistory.project_id).filter(
Project.organisation_id == org_id
)
if org_name:
try:
organisation_id = OrganisationService.get_organisation_by_name(
org_name
).id
except NotFound:
organisation_id = None
query = query.join(Project, Project.id == TaskHistory.project_id).filter(
Project.organisation_id == organisation_id
)
if campaign:
try:
campaign_id = CampaignService.get_campaign_by_name(campaign).id
except NotFound:
campaign_id = None
query = query.join(
campaign_projects,
campaign_projects.c.project_id == TaskHistory.project_id,
).filter(campaign_projects.c.campaign_id == campaign_id)
if project_id:
query = query.filter(TaskHistory.project_id.in_(project_id))
if country:
# Unnest country column array.
sq = Project.query.with_entities(
Project.id, func.unnest(Project.country).label("country")
).subquery()
query = query.filter(sq.c.country.ilike("%{}%".format(country))).filter(
TaskHistory.project_id == sq.c.id
)
query = query.subquery()
date_query = db.session.query(
func.DATE(
func.generate_series(start_date, end_date, timedelta(days=1))
).label("d_day")
).subquery()
grouped_dates = (
db.session.query(
date_query.c.d_day,
query.c.action_text,
func.count(query.c.action_text).label("cnt"),
)
.join(date_query, date_query.c.d_day == query.c.day)
.group_by(date_query.c.d_day, query.c.action_text)
.order_by(date_query.c.d_day)
).subquery()
mapped = (
db.session.query(
grouped_dates.c.d_day, grouped_dates.c.action_text, grouped_dates.c.cnt
)
.select_from(grouped_dates)
.filter(grouped_dates.c.action_text == "MAPPED")
.subquery()
)
validated = (
db.session.query(
grouped_dates.c.d_day, grouped_dates.c.action_text, grouped_dates.c.cnt
)
.select_from(grouped_dates)
.filter(grouped_dates.c.action_text == "VALIDATED")
.subquery()
)
badimagery = (
db.session.query(
grouped_dates.c.d_day, grouped_dates.c.action_text, grouped_dates.c.cnt
)
.select_from(grouped_dates)
.filter(grouped_dates.c.action_text == "BADIMAGERY")
.subquery()
)
result = (
db.session.query(
func.to_char(grouped_dates.c.d_day, "YYYY-MM-DD"),
func.coalesce(mapped.c.cnt, 0).label("mapped"),
func.coalesce(validated.c.cnt, 0).label("validated"),
func.coalesce(badimagery.c.cnt, 0).label("badimagery"),
)
.select_from(grouped_dates)
.distinct(grouped_dates.c.d_day)
.filter(grouped_dates.c.d_day is not None)
.outerjoin(mapped, mapped.c.d_day == grouped_dates.c.d_day)
.outerjoin(validated, validated.c.d_day == grouped_dates.c.d_day)
.outerjoin(badimagery, badimagery.c.d_day == grouped_dates.c.d_day)
)
day_stats_dto = list(map(StatsService.set_task_stats, result))
results_dto = TaskStatsDTO()
results_dto.stats = day_stats_dto
return results_dto