in backend/services/stats_service.py [0:0]
def get_user_contributions(project_id: int) -> ProjectContributionsDTO:
""" Get all user contributions on a project"""
mapped_stmt = (
Task.query.with_entities(
Task.mapped_by,
func.count(Task.mapped_by).label("count"),
func.array_agg(Task.id).label("task_ids"),
)
.filter(Task.project_id == project_id)
.group_by(Task.mapped_by)
.subquery()
)
validated_stmt = (
Task.query.with_entities(
Task.validated_by,
func.count(Task.validated_by).label("count"),
func.array_agg(Task.id).label("task_ids"),
)
.filter(Task.project_id == project_id)
.group_by(Task.validated_by)
.subquery()
)
results = (
db.session.query(
User.id,
User.username,
User.name,
User.mapping_level,
User.picture_url,
User.date_registered,
coalesce(mapped_stmt.c.count, 0).label("mapped"),
coalesce(validated_stmt.c.count, 0).label("validated"),
(
coalesce(mapped_stmt.c.count, 0)
+ coalesce(validated_stmt.c.count, 0)
).label("total"),
mapped_stmt.c.task_ids.label("mapped_tasks"),
validated_stmt.c.task_ids.label("validated_tasks"),
)
.outerjoin(
validated_stmt,
mapped_stmt.c.mapped_by == validated_stmt.c.validated_by,
full=True,
)
.join(
User,
or_(
User.id == mapped_stmt.c.mapped_by,
User.id == validated_stmt.c.validated_by,
),
)
.order_by(desc("total"))
.all()
)
contrib_dto = ProjectContributionsDTO()
user_contributions = [
UserContribution(
dict(
username=r.username,
name=r.name,
mapping_level=MappingLevel(r.mapping_level).name,
picture_url=r.picture_url,
mapped=r.mapped,
validated=r.validated,
total=r.total,
mapped_tasks=r.mapped_tasks if r.mapped_tasks is not None else [],
validated_tasks=r.validated_tasks
if r.validated_tasks is not None
else [],
date_registered=r.date_registered.date(),
)
)
for r in results
]
contrib_dto.user_contributions = user_contributions
return contrib_dto