in microservices/assessment_service/src/routes/assessment.py [0:0]
def update_human_graded_assessment(uuid: str,
input_assessment: UpdateHumanGradedAssessmentModel):
"""
Update a human graded assessment
#### Args:
- input_assessment (AssessmentModel): Required body of
human graded assessment
### Raises:
- ResourceNotFoundException: If the assessment does not exist
- Exception: 500 Internal Server Error if something went wrong
### Returns:
- AssessmentModel: Updated Assessment Object
"""
try:
input_assessment_dict = {**input_assessment.dict()}
update_dict = copy.deepcopy(input_assessment_dict)
#get the list of performance indicators
performance_indicators = []
existing_assessment = get_assessment(uuid)
existing_rubrics = existing_assessment["data"].get(
"child_nodes", {}).get("rubrics", [])
updated_rubrics = []
if input_assessment_dict.get("child_nodes", {}):
updated_rubrics = input_assessment_dict.get("child_nodes").get("rubrics",
[])
if updated_rubrics:
new_rubric_list = []
new_rubric_criterion_list = []
for rubric_update in updated_rubrics:
#For existing rubrics
if "uuid" in rubric_update and\
rubric_update["uuid"] in existing_rubrics:
rubric = Rubric.find_by_uuid(rubric_update["uuid"])
rubric_data = rubric.get_fields(reformat_datetime=True)
existing_rubric_criteria =\
rubric_data["child_nodes"]["rubric_criteria"]
for rubric_criteria_update in\
rubric_update["child_nodes"]["rubric_criteria"]:
#For existing rubric_criteria
performance_indicators.extend(
rubric_criteria_update.get("performance_indicators",[]))
if "uuid" in rubric_criteria_update and\
rubric_criteria_update["uuid"] in existing_rubric_criteria:
print("Need to update existing rubric")
rubric_criteria_uuid = rubric_criteria_update["uuid"]
update_rubric_criteria_dict =\
UpdateRubricCriterionModel(**rubric_criteria_update)
update_criterion_response =\
update_rubric_criterion(rubric_criteria_uuid,\
update_rubric_criteria_dict)
new_rubric_criterion_list.append(
update_criterion_response["data"]["uuid"])
else:
#For a new rubric criteria
performance_indicators.extend(rubric_criteria_update.get(
"performance_indicators",[]))
new_rubric_criteria =\
BasicRubricCriterionModel(**rubric_criteria_update)
rubric_criterion_response =\
create_rubric_criterion(new_rubric_criteria)
new_rubric_criterion_list.append(
rubric_criterion_response["data"]["uuid"])
rubric_update["child_nodes"]["rubric_criteria"] =\
new_rubric_criterion_list
rubric_uuid = rubric_update["uuid"]
rubric_update_dict = UpdateRubricModel(**rubric_update)
update_rubric_response = update_rubric(
rubric_uuid, rubric_update_dict)
new_rubric_list.append(update_rubric_response["data"]["uuid"])
else:
#For new rubrics
#For new rubric criteria
new_rubric_criteria_list = []
for new_rubric_criteria_update in\
rubric_update["child_nodes"]["rubric_criteria"]:
performance_indicators.extend(new_rubric_criteria_update.get(
"performance_indicators",[]))
new_rubric_criteria =\
BasicRubricCriterionModel(**new_rubric_criteria_update)
rubric_criterion_response = create_rubric_criterion(
new_rubric_criteria)
new_rubric_criteria_list.append(
rubric_criterion_response["data"]["uuid"])
rubric_update["child_nodes"]["rubric_criteria"] =\
new_rubric_criteria_list
#Create the new rubric
new_rubric = RubricModel(**rubric_update)
rubric_response = create_rubric(new_rubric)
new_rubric_list.append(rubric_response["data"]["uuid"])
update_dict["child_nodes"]["rubrics"] = new_rubric_list
#Performance Indicators
# Validate if the skill ID exists in Firestore DB
unique_performance_indicators = set(performance_indicators)
for skill_id in unique_performance_indicators:
_ = Skill.find_by_uuid(skill_id)
update_dict["references"]["skills"] = unique_performance_indicators
assessment_fields = CommonAPIHandler.update_document(
Assessment, uuid, update_dict)
return {
"success": True,
"message": "Successfully updated the assessment",
"data": assessment_fields
}
except ResourceNotFoundException as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise ResourceNotFound(str(e)) from e
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise InternalServerError(str(e)) from e