in libs/libcommon/src/libcommon/queue/jobs.py [0:0]
def get_jobs_total_by_type_status_and_dataset_status(self) -> JobsTotalByTypeStatusAndDatasetStatus:
"""Count the number of jobs by job type, status and dataset status.
Returns:
an object with the total of jobs by job type, status and dataset status.
Keys are a tuple (job_type, status, dataset_status), and values are the total of jobs.
"""
blocked_datasets = get_blocked_datasets()
return {
(metric["job_type"], metric["status"], metric["dataset_status"]): metric["total"]
for metric in JobDocument.objects().aggregate(
[
{"$sort": {"type": 1, "status": 1}},
{
# TODO(SL): optimize this part?
"$addFields": {
"dataset_status": {
"$cond": {
"if": {"$in": ["$dataset", blocked_datasets]},
"then": DATASET_STATUS_BLOCKED,
"else": DATASET_STATUS_NORMAL,
}
}
}
},
{
"$group": {
"_id": {"type": "$type", "status": "$status", "dataset_status": "$dataset_status"},
"total": {"$sum": 1},
}
},
{
"$project": {
"job_type": "$_id.type",
"status": "$_id.status",
"dataset_status": "$_id.dataset_status",
"total": "$total",
}
},
]
)
}