in microservices/assessment_service/src/routes/submitted_assessment.py [0:0]
def get_filtered_submitted_assessments(
req: Request,
sort_by: Optional[Literal["learner_name", "unit_name", "result",
"attempt_no",
"timer_start_time"]] = "timer_start_time",
sort_order: Optional[Literal["ascending", "descending"]] = "ascending",
name: Optional[str] = None,
assessor_id: Optional[str] = None,
is_autogradable: Union[bool, None] = Query(default=None),
discipline_name: Union[List[str], None] = Query(default=None),
unit_name: Union[List[str], None] = Query(default=None),
status: Union[List[str], None] = Query(default=None),
type: Union[List[str], None] = Query(default=None),
result: Union[List[str], None] = Query(default=None),
is_flagged: Union[bool, None] = Query(default=None),
skip: int = Query(0, ge=0, le=2000),
limit: int = Query(10, ge=1, le=100)):
"""
The get filtered submitted assessments endpoint will return an array
of submitted assessments from firestore
### Args:
- sort_by (str): sort submitted assessment based on this parameter value
- sort_order (str): ascending or descending sort
- name (str): search submitted assessment based on assessment name keyword
- assessor_id (str): uuid of assessor
- is_autogradable (bool): to return autogradable or human gradable or both
type of assessments
- discipline_name (list): Pathway disciplines to filter submitted assessment
- unit_name (list): Pathway units to filter submitted assessments on
- type (list): type to filter submitted assessments on
- status (list): status of submitted assessment to filter on
- result (list): result to filter submitted assessments on
- is_flagged (bool): return flagged assessments or not or both
- skip (int): Number of objects to be skipped
- limit (int): Size of array to be returned
### Raises:
- ValidationError: If filters are incorrect
- Exception: 500 Internal Server Error if something went wrong
### Returns:
- AllSubmittedAssessmentAssessorResponseModel: List of SubmittedAssessments
"""
try:
header = {"Authorization": req.headers.get("authorization")}
collection_manager = SubmittedAssessment.collection
collection_manager = collection_manager.filter("is_deleted", "==", False)
in_query_used = False
status_filter_applied = True
type_filter_applied = True
learner_filter_applied = True
# assessor_id is a string because an assessor can either view only the
# submissions assigned to him or all the submissions
if assessor_id:
assessor = User.find_by_user_id(assessor_id)
if assessor.user_type == "assessor":
collection_manager = collection_manager.filter("assessor_id", "==",
assessor_id)
elif assessor.user_type in ["coach", "instructor"]:
## TODO: Store user_id of the learner instead of user_id
learner_ids = staff_to_learner_handler(header, assessor_id,
assessor.user_type)
if not learner_ids:
## FIXME: return statement should be executed in the end only
return {
"success": True,
"message": "Successfully fetched the submitted assessments.",
"data": []
}
elif len(learner_ids)<=30:
in_query_used = True
collection_manager = collection_manager.filter("learner_id", "in",
learner_ids)
else:
learner_filter_applied = False
else:
raise ResourceNotFoundException(f"User of type {assessor.user_type} "\
"not found.")
if is_flagged is not None:
collection_manager = collection_manager.filter("is_flagged", "==",
is_flagged)
if is_autogradable is not None:
collection_manager = collection_manager.filter("is_autogradable", "==",
is_autogradable)
if result:
if len(result) == 1:
collection_manager = collection_manager.filter("result", "==",
result[0])
else:
in_query_used = True
collection_manager = collection_manager.filter("result", "in", result)
if status:
if len(status) == 1:
collection_manager = collection_manager.filter("status", "==",
status[0])
elif not in_query_used:
in_query_used = True
collection_manager = collection_manager.filter("status", "in", status)
else:
status_filter_applied = False
if type:
if len(type) == 1:
collection_manager = collection_manager.filter("type", "==", type[0])
elif not in_query_used:
in_query_used = True
collection_manager = collection_manager.filter("type", "in", type)
else:
type_filter_applied = False
# check if the collection can be directly sorted if the field
# to sort on is a field in the data model of SubmittedAssessment
if sort_by in ["result", "attempt_no", "timer_start_time"]:
sorted_collection = True
if sort_order == "descending":
sort_by = "-" + sort_by
submitted_assessments = collection_manager.order(sort_by).fetch()
else:
sorted_collection = False
submitted_assessments = collection_manager.fetch()
filtered_submitted_assessments = []
le_map = {}
discipline_map = {}
instructor_map = {}
assessment_map = {}
assessor_map = {}
# iterate through all the submitted assessments
for submitted_assessment in submitted_assessments:
assessment_id = submitted_assessment.assessment_id
# if the submitted assessments are already sorted, then we need to fetch
# only limit number of values
if not limit:
break
# if status filter is not applied above and the submitted_assessment
# status does not lie in the status query, then do not add this submission
if not status_filter_applied and \
submitted_assessment.status not in status:
continue
# if type filter is not applied above and the submitted_assessment
# type does not lie in the type query, then do not add this submission
elif not type_filter_applied and \
submitted_assessment.type not in type:
continue
# if filter by learner is not applied above and the submitted_assessment
# learner_id does not lie in the list of learner_ids query, then do not
# add this submission
elif not learner_filter_applied and \
submitted_assessment.learner_id not in learner_ids:
continue
if assessment_id in assessment_map:
assessment_node = assessment_map[assessment_id]
else:
assessment_node = Assessment.find_by_uuid(assessment_id)
assessment_map[assessment_id] = assessment_node
assessment_data = assessment_node.get_fields(reformat_datetime=True)
assessment_name = assessment_data["name"]
# 1. If search query (name) is None, or
# 2. if it is not None, then it exists in assessment name
# Then the submitted assessment document can be added
keyword_exists = not name or \
(name and name.lower() in assessment_name.lower())
if keyword_exists:
try:
if assessment_id in le_map:
le_data = le_map[assessment_id]
else:
le_node = traverse_up_uuid(assessment_id, "assessments",
"learning_experience")
le_data = le_node.get_fields()
le_map[assessment_id] = {
"name": le_data.get("name", ""),
"uuid": le_data.get("uuid")
}
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
Logger.info("Passing Empty Dict to Get Success Response")
le_data = {}
try:
le_id = le_data.get("uuid")
if le_id in discipline_map:
discipline_data = discipline_map[le_id]
else:
discipline_data = traverse_up_uuid(le_id, "learning_experiences",
"discipline").get_fields()
discipline_map[le_id] = {
"name": discipline_data.get("name", ""),
"uuid": discipline_data.get("uuid")
}
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
Logger.info("Passing Empty Dict to Get Success Response")
discipline_data = {}
# 1a. If unit_name query filter is None, OR
# 1b. if it is not None, then the unit pathway's name lies in any
# unit_name, AND
# 2a. If discipline_name query filter is None, OR
# 2b. if it is not None, then the pathway discipline's name lies in
# any discipline_name
# Then the submitted assessment document can be added
if (not unit_name or le_data.get("name", "") in unit_name) and (not
discipline_name or discipline_data.get("name", "") in discipline_name):
# 1. if the submitted assessment collection is already sorted
# 2. if the number of assessments to be skipped is not yet complete
# Then skip this submitted assessment and reduce skip by 1
# else add the submitted assessment
if sorted_collection and skip:
skip -= 1
else:
submitted_assessment_data = get_submitted_assessment_data(
submitted_assessment, False, None, assessment_node,
assessor_map)
submitted_assessment_data["unit_name"] = le_data.get("name", "")
submitted_assessment_data["discipline_name"] = discipline_data.get(
"name", "")
# get instructor data
try:
discipline_id = discipline_data.get("uuid")
if discipline_id in instructor_map:
instructor_data = instructor_map[discipline_id]
elif assessor and assessor.user_type == "instructor":
instructor_id = assessor.user_id
instructor_data = User.find_by_user_id(
instructor_id).get_fields()
instructor_map[discipline_id] = {
"first_name": instructor_data.get("first_name", ""),
"last_name": instructor_data.get("last_name", ""),
"user_id": instructor_data.get("user_id")
}
else:
instructor_id = instructor_handler(
header, discipline_data["uuid"])
instructor_data = User.find_by_user_id(
instructor_id).get_fields()
instructor_map[discipline_id] = {
"first_name": instructor_data.get("first_name", ""),
"last_name": instructor_data.get("last_name", ""),
"user_id": instructor_data.get("user_id")
}
submitted_assessment_data["instructor_id"] = instructor_data.get(
"user_id", "")
submitted_assessment_data["instructor_name"] = \
(instructor_data.get("first_name", "") + " " +
instructor_data.get("last_name", "")).lstrip()
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
submitted_assessment_data["instructor_id"] = ""
submitted_assessment_data["instructor_name"] = "Unassigned"
filtered_submitted_assessments.append(submitted_assessment_data)
if sorted_collection:
limit -= 1
# if the submitted assessments are not sorted, then sort it and return the
# only those submissions defined by skip and limit
if not sorted_collection:
if sort_by == "ascending":
filtered_submitted_assessments = sorted(
filtered_submitted_assessments, key=lambda i: i[sort_by])
else:
filtered_submitted_assessments = sorted(
filtered_submitted_assessments,
key=lambda i: i[sort_by],
reverse=True)
filtered_submitted_assessments = \
filtered_submitted_assessments[skip:skip+limit]
count = 10000
response = {"records": filtered_submitted_assessments, "total_count": count}
return {
"success": True,
"message": "Successfully fetched the submitted assessments.",
"data": response
}
except ResourceNotFoundException as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise ResourceNotFound(str(e)) from e
except ValidationError as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise BadRequest(str(e), data=e.data) from e
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise InternalServerError(str(e)) from e