microservices/course_ingestion/services/course_inference.py (236 lines of code) (raw):
"""
CRUD for course
"""
import re
from typing import Dict
from utils.paginator import pagination
from common.utils.gcs_adapter import GcsCrudService
from common.utils.errors import ResourceNotFoundException, PayloadTooLargeError
from common.models import Course, Competency, LearningContentItem
from common.utils.cache_service import set_key, get_key, delete_key, \
set_key_normal, get_key_normal
from config import GCP_PROJECT, PAYLOAD_FILE_SIZE
# pylint: disable=redefined-builtin,broad-exception-raised,protected-access
# pylint: disable=raising-bad-type
class CourseService():
"""Class for course"""
def create_course(self, course):
"""creates a course"""
new_course = Course()
from_learning_content = course.get("add_competencies_from_learning_content",
False)
if from_learning_content:
course["competency_ids"] = self.get_all_learning_content_competencies(
course.get("learning_content_ids", []))
for key, value in course.items():
if key != "competency_ids":
setattr(new_course, key, value)
elif key == "competency_ids":
if not from_learning_content:
self.validate_new_competencies(value)
setattr(new_course, key, list(set(value)))
new_course.save()
return self.get_course(new_course.id)
def get_all_learning_content_competencies(self, learning_content_ids):
all_competencies = []
for lc_id in learning_content_ids:
learning_content = LearningContentItem.find_by_id(lc_id)
learning_content.load_children()
for competency in learning_content.competencies:
all_competencies.append(competency.id)
return all_competencies
def get_all_courses(self, skip: int, limit: int, sort_by: str, order_by: str,
competencies: bool, search_query: str):
"""returns all the courses"""
course_list = []
ord_by = order_by
if sort_by == "ascending" and order_by != "title":
ord_by = order_by
elif sort_by == "descending" and order_by != "title":
ord_by = "-{}".format(order_by)
courses = Course.collection.order(ord_by).fetch()
if courses:
try:
for course in courses:
course_dict = course.get_fields(reformat_datetime=True)
course_dict["id"] = course.id
competency_list = []
if competencies:
course.load_children()
for competency in course.competencies:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
competency_list.append(competency_item)
course_dict["competencies"] = competency_list
course_dict.pop("competency_ids", None)
course_list.append(course_dict)
if search_query is not None:
course_list = [course for course in course_list if
search_query.lower() in course["title"].lower()]
if sort_by == "ascending" and order_by == "title":
course_list = sorted(course_list, key=lambda i: i["title"].lower())
elif sort_by == "descending" and order_by == "title":
course_list = sorted(course_list, key=lambda i: i["title"].lower(),
reverse=True)
if skip == 0 and limit == 0:
return course_list
elif skip < 0 or limit < 0:
raise Exception("The skip and limit value should be a positive "
"number")
else:
result = pagination(payload=course_list, skip=skip, limit=limit)
return {"data": result, "total_rec": len(course_list)}
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch all courses") from e
else:
raise ResourceNotFoundException("No courses found")
def get_course(self, id):
"""returns course details for a given course id"""
course = Course.find_by_id(id)
try:
course_dict = course.get_fields(reformat_datetime=True)
course_dict["id"] = course.id
course.load_children()
competency_list = []
for competency in course.competencies:
competency_item = competency.get_fields(reformat_datetime=True)
competency_item["id"] = competency.id
competency_list.append(competency_item)
course_dict["competencies"] = competency_list
course_dict.pop("competency_ids", None)
return course_dict
except (TypeError, KeyError) as e:
raise Exception("Failed to fetch the course") from e
def validate_new_competencies(self, competency_ids):
"""Validates competency ids if they exist or not
from the competency ids list"""
for id in competency_ids:
Competency.find_by_id(id)
def update_course(self, id, course_request):
"""updates a given course"""
course = Course.find_by_id(id)
try:
course_fields = course.get_fields()
for key, value in course_request.items():
if key != "competency_ids":
course_fields[key] = value
elif key == "competency_ids":
self.validate_new_competencies(value)
course_fields[key] = list(set(value))
for key, value in course_fields.items():
setattr(course, key, value)
course.update()
return self.get_course(id)
except (TypeError, KeyError) as e:
raise Exception("Failed to update course") from e
def delete_course(self, id):
"""deletes course given course id"""
course = Course.find_by_id(id)
course.delete_by_id(id)
def add_competencies(self, id, request_body):
"""Updates and adds new competencies to the course"""
course = Course.find_by_id(id)
try:
course_fields = course.get_fields()
for key, value in request_body.items():
if key != "competency_ids":
course_fields[key] = value
for key, value in course_fields.items():
setattr(course, key, value)
new_competency_ids = request_body.get("competency_ids", [])
if new_competency_ids:
existing_comp_ids = course_fields["competency_ids"]
new_competency_ids = set(new_competency_ids) - set(existing_comp_ids)
new_competency_ids = list(new_competency_ids)
self.validate_new_competencies(new_competency_ids)
course_fields["competency_ids"].extend(new_competency_ids)
setattr(course, "competency_ids", course_fields["competency_ids"])
course.update()
return self.get_course(id)
except (TypeError, KeyError) as e:
raise Exception("Failed to update the course") from e
def fetch_course_linked_contents(self, course_id: str) -> list:
"""
Function is used to fetch course level learning contents
:param course_id: str
:return: list
"""
result = []
doc_key = f"courses/{course_id}"
course_details = Course.collection.get(key=doc_key)
content_details = LearningContentItem.collection.fetch()
for content in content_details:
lin_comp = []
for com_id in content.competency_ids:
for cou_com in course_details.competency_ids:
if com_id == cou_com:
lin_comp.append(cou_com)
if lin_comp:
if len(lin_comp) == len(content.competency_ids):
partial_linked = False
else:
partial_linked = True
res = {
"lc_title": content.title,
"lc_id": content.id,
"partial_linked": partial_linked,
"linked_competencies": lin_comp
}
result.append(res)
return result
def validate_upload_course_pdf_service(self, course_pdf: object,
user_id: str) -> Dict:
"""
Function is used to upload course pdf to gcs bucket
:param course_pdf: file object
:param user_id: string
:return: string
"""
file = course_pdf[0]
gcs_service = GcsCrudService(GCP_PROJECT)
try:
search = re.search(".pdf$", file["filename"].lower())
if search is not None:
folders = gcs_service.fetch_all_blobs(prefix="course-resources")
folders = [blob.name for blob in folders if ".pdf" in blob.name.lower()]
gs_files = [gs_file.split("/")[2] for gs_file in folders]
if file["filename"].lower() not in gs_files:
res = {"validation": True, "message": "File is ready to upload"}
else:
res = {"validation": False,
"message": f"The file already exists under this file name "
f"'{file['filename'].split('.')[0]}'. Click "
f"'Cancel' to cancel the upload or 'Yes' to "
f"overwrite the existing file. "}
if len(file["body"]) <= PAYLOAD_FILE_SIZE:
set_key(key=f"{user_id}_file_name", value=file["filename"],
expiry_time=600)
set_key_normal(key=f"{user_id}_file_body", value=file["body"],
expiry_time=600)
else:
raise PayloadTooLargeError(
"File size is too large to upload,please use the file upto 80MB")
return res
else:
return {"validation": False,
"message": f"Invalid file format"
f" {file['filename'].split('.')[1]}, Course file "
f"must be in PDF format only,other formats will not "
f"be accepted. Please try again with the PDF file. "}
except IndexError:
return {"validation": False,
"message": f"There is no file extension in the file called "
f"'{file['filename']}'.So, upload the PDF file with "
f"the file extension."}
def upload_course_pdf_service(self, user_id: str) -> Dict[str, str]:
"""
Function to upload the PDF file to gcs bucket
:param user_id: str
:return: str
"""
file_name = get_key(key=f"{user_id}_file_name")
file_body = get_key_normal(key=f"{user_id}_file_body")
res = GcsCrudService(GCP_PROJECT).upload_file_to_gcs_bucket(
file_name=file_name, file_body=file_body,
parent_folder_name="course-resources")
res["file_name"] = file_name
delete_key("file_name")
delete_key("file_body")
return res
def fetch_all_course_pdf_service(self, search_query: str) -> list:
"""
Function to fetch all the PDF file from GCS bucket
:param search_query: string
:return: list
"""
gs_path = []
gcs_service = GcsCrudService(GCP_PROJECT)
blobs = gcs_service.fetch_all_blobs(prefix="course-resources")
for blob in blobs:
path = f"gs://{GCP_PROJECT}/{blob.name}"
file_name = blob.name.lower()
if ".pdf" in file_name.lower():
if search_query is None:
gs_path.append({
"gs_path": path,
"file_path": file_name,
"file_name": file_name.split("/")[2].split(".")[0]
})
else:
if search_query.lower() in (file_name.split("/")[2].split(".")[
0].lower()):
gs_path.append({
"gs_path": path,
"file_path": file_name,
"file_name": file_name.split("/")[2].split(".")[0]
})
return gs_path
def delete_the_blob_from_bucket(self, gs_path: str) -> str:
"""
Function to delete the blob from the GCS bucket
:param gs_path: string
:return: str
"""
gcs_service = GcsCrudService(GCP_PROJECT)
res = gcs_service.delete_file_from_gcs_bucket(blob_name=gs_path)
return res