microservices/course_ingestion/services/competency_inference.py (190 lines of code) (raw):
"""CRUD for Competency"""
from common.models import Course, Competency, LearningContentItem
from common.utils.cache_service import delete_key
from common.utils.errors import ResourceNotFoundException
#pylint: disable=redefined-builtin,broad-exception-raised,protected-access
class CompetencyService():
"""Class for Competency"""
def create_course_competency(self, course_id, competency):
"""creates a competency"""
comp = Competency()
for key, value in competency.items():
setattr(comp, key, value)
parent = Course.find_by_id(course_id)
if parent:
comp.save()
parent.competency_ids.append(comp.id)
parent.save()
return self.get_course_competency(comp.id, parent.id)
def get_course_competency(self, id, parent_id):
"""returns competency given id"""
course = Course.find_by_id(parent_id)
competency = Competency.find_by_id(id)
if competency:
if not id in course.competency_ids:
raise ResourceNotFoundException(
"This competency is not associated with current course")
try:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
return competency_item
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch competency") from e
else:
raise ResourceNotFoundException("Competency with this ID does not exists")
def get_course_all_competencies(self, course_id):
"""returns all competencies given course"""
competency_list = []
course = Course.find_by_id(course_id)
try:
course.load_children()
for competency in course.competencies:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
competency_list.append(competency_item)
return competency_list
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch all competencies") from e
def update_course_competency(self, id, competency, parent_id):
"""updates all fields and/or parent of a competency"""
course = Course.find_by_id(parent_id)
comp = Competency.find_by_id(id)
if not id in course.competency_ids:
raise ResourceNotFoundException(
"This competency is not associated with current course")
try:
competency_fields = comp.get_fields()
for key, value in competency.items():
competency_fields[key] = value
for key, value in competency_fields.items():
setattr(comp, key, value)
comp.update()
return self.get_competency(comp.id)
except (TypeError, KeyError) as e:
raise Exception("Failed to update the competency") from e
def delete_course_competency(self, id, parent_id):
"""deletes a competency"""
competency = Competency.find_by_id(id)
if competency:
# competency.delete_tree()
parent = Course.find_by_id(parent_id)
if id in parent.competency_ids:
parent.competency_ids.remove(id)
parent.save()
else:
raise ResourceNotFoundException(
"Course does not have this competency associated")
def create_competency(self, competency):
"""creates a competency"""
comp = Competency()
for key, value in competency.items():
setattr(comp, key, value)
comp.save()
return self.get_competency(comp.id)
def get_competency(self, id, is_text_required=False):
"""returns competency given id"""
competency = Competency.find_by_id(id)
try:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
if is_text_required:
competency.load_children()
competency_text = []
for sub_competency in competency.sub_competencies:
sub_competency.load_children()
for learning_objective in sub_competency.learning_objectives:
competency_text.extend(learning_objective.text.split("<p>"))
competency_item["text"] = competency_text
return competency_item
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch competency") from e
def get_all_competencies(self):
"""returns all competencies given course"""
competency_list = []
try:
competencies = Competency.collection.fetch()
for competency in competencies:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
competency_list.append(competency_item)
return competency_list
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch all competencies") from e
def update_competency(self, id, competency):
"""updates all fields and/or parent of a competency"""
comp = Competency.find_by_id(id)
try:
competency_fields = comp.get_fields()
for key, value in competency.items():
competency_fields[key] = value
for key, value in competency_fields.items():
setattr(comp, key, value)
comp.update()
return self.get_competency(comp.id)
except (TypeError, KeyError) as e:
raise Exception("Failed to update the competency") from e
def delete_competency(self, id):
"""deletes a competency"""
competency = Competency.find_by_id(id)
all_courses = Course.collection.fetch()
for course in all_courses:
associated_comp_ids = course.competency_ids
if id in associated_comp_ids:
associated_comp_ids.remove(id)
course.competency_ids = associated_comp_ids
course.save()
# removing the cached course as topic tree
# changes on deleting the competency
delete_key(course.id)
all_lc = LearningContentItem.collection.fetch()
for lc in all_lc:
associated_comp_ids = lc.competency_ids
if id in associated_comp_ids:
associated_comp_ids.remove(id)
lc.competency_ids = associated_comp_ids
lc.save()
competency.delete_tree()
def create_learning_content_competency(self, content_id, competency):
"""creates a competency"""
comp = Competency()
for key, value in competency.items():
setattr(comp, key, value)
parent = LearningContentItem.find_by_id(content_id)
if parent:
comp.save()
parent.competency_ids.append(comp.id)
parent.save()
return self.get_learning_content_competency(comp.id, parent.id)
def get_learning_content_competency(self, id, parent_id):
"""returns competency given id"""
learning_content = LearningContentItem.find_by_id(parent_id)
competency = Competency.find_by_id(id)
if competency:
if not id in learning_content.competency_ids:
raise ResourceNotFoundException(
"This competency is not associated with current Learning Content")
try:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
return competency_item
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch competency") from e
def get_all_learning_content_competencies(self, content_id):
"""returns all competencies given course"""
competency_list = []
learning_content = LearningContentItem.find_by_id(content_id)
try:
learning_content.load_children()
for competency in learning_content.competencies:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
competency_list.append(competency_item)
return competency_list
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch all competencies") from e
def update_learning_content_competency(self, id, competency, parent_id):
"""updates all fields and/or parent of a competency"""
learning_content = LearningContentItem.find_by_id(parent_id)
comp = Competency.find_by_id(id)
if not id in learning_content.competency_ids:
raise ResourceNotFoundException(
"This competency is not associated with current course")
try:
competency_fields = comp.get_fields()
for key, value in competency.items():
competency_fields[key] = value
for key, value in competency_fields.items():
setattr(comp, key, value)
comp.update()
return self.get_competency(comp.id)
except (TypeError, KeyError) as e:
raise Exception("Failed to update the competency") from e
def delete_learning_content_competency(self, id, parent_id):
"""deletes a competency"""
Competency.find_by_id(id)
parent = LearningContentItem.find_by_id(parent_id)
if id in parent.competency_ids:
parent.competency_ids.remove(id)
parent.save()
else:
raise ResourceNotFoundException(
"Learning Content does not have this competency associated")