in backend/services/users/user_service.py [0:0]
def get_detailed_stats(username: str):
user = UserService.get_user_by_username(username)
stats_dto = UserStatsDTO()
actions = [
TaskStatus.VALIDATED.name,
TaskStatus.INVALIDATED.name,
TaskStatus.MAPPED.name,
]
actions_table = (
db.session.query(literal(TaskStatus.VALIDATED.name).label("action_text"))
.union(
db.session.query(
literal(TaskStatus.INVALIDATED.name).label("action_text")
),
db.session.query(literal(TaskStatus.MAPPED.name).label("action_text")),
)
.subquery()
.alias("actions_table")
)
# Get only rows with the given actions.
filtered_actions = (
TaskHistory.query.with_entities(
TaskHistory.user_id,
TaskHistory.project_id,
TaskHistory.task_id,
TaskHistory.action_text,
)
.filter(TaskHistory.action_text.in_(actions))
.subquery()
.alias("filtered_actions")
)
user_tasks = (
db.session.query(filtered_actions)
.filter(filtered_actions.c.user_id == user.id)
.subquery()
.alias("user_tasks")
)
others_tasks = (
db.session.query(filtered_actions)
.filter(filtered_actions.c.user_id != user.id)
.filter(filtered_actions.c.task_id == user_tasks.c.task_id)
.filter(filtered_actions.c.project_id == user_tasks.c.project_id)
.filter(filtered_actions.c.action_text != TaskStatus.MAPPED.name)
.subquery()
.alias("others_tasks")
)
user_stats = (
db.session.query(
actions_table.c.action_text, func.count(user_tasks.c.action_text)
)
.outerjoin(
user_tasks, actions_table.c.action_text == user_tasks.c.action_text
)
.group_by(actions_table.c.action_text)
)
others_stats = (
db.session.query(
func.concat(actions_table.c.action_text, "_BY_OTHERS"),
func.count(others_tasks.c.action_text),
)
.outerjoin(
others_tasks, actions_table.c.action_text == others_tasks.c.action_text
)
.group_by(actions_table.c.action_text)
)
res = user_stats.union(others_stats).all()
results = {key: value for key, value in res}
projects_mapped = UserService.get_projects_mapped(user.id)
stats_dto.tasks_mapped = results["MAPPED"]
stats_dto.tasks_validated = results["VALIDATED"]
stats_dto.tasks_invalidated = results["INVALIDATED"]
stats_dto.tasks_validated_by_others = results["VALIDATED_BY_OTHERS"]
stats_dto.tasks_invalidated_by_others = results["INVALIDATED_BY_OTHERS"]
stats_dto.projects_mapped = len(projects_mapped)
stats_dto.countries_contributed = UserService.get_countries_contributed(user.id)
stats_dto.contributions_by_day = UserService.get_contributions_by_day(user.id)
stats_dto.total_time_spent = 0
stats_dto.time_spent_mapping = 0
stats_dto.time_spent_validating = 0
query = (
TaskHistory.query.with_entities(
func.date_trunc("minute", TaskHistory.action_date).label("trn"),
func.max(TaskHistory.action_text).label("tm"),
)
.filter(TaskHistory.user_id == user.id)
.filter(TaskHistory.action == "LOCKED_FOR_VALIDATION")
.group_by("trn")
.subquery()
)
total_validation_time = db.session.query(
func.sum(cast(func.to_timestamp(query.c.tm, "HH24:MI:SS"), Time))
).scalar()
if total_validation_time:
stats_dto.time_spent_validating = total_validation_time.total_seconds()
stats_dto.total_time_spent += stats_dto.time_spent_validating
total_mapping_time = (
db.session.query(
func.sum(
cast(func.to_timestamp(TaskHistory.action_text, "HH24:MI:SS"), Time)
)
)
.filter(
or_(
TaskHistory.action == TaskAction.LOCKED_FOR_MAPPING.name,
TaskHistory.action == TaskAction.AUTO_UNLOCKED_FOR_MAPPING.name,
)
)
.filter(TaskHistory.user_id == user.id)
.scalar()
)
if total_mapping_time:
stats_dto.time_spent_mapping = total_mapping_time.total_seconds()
stats_dto.total_time_spent += stats_dto.time_spent_mapping
stats_dto.contributions_interest = UserService.get_interests_stats(user.id)
return stats_dto