microservices/course_ingestion/services/learning_content_inference.py (268 lines of code) (raw):

"""learning_content inference""" import os import json import shutil from typing import Union, Any from common.utils.gcs_adapter import download_file_from_gcs from common.utils.logging_handler import Logger from .parsers.custom.custom_spider import CustomSpider from .parsers.custom.toc_spider import CustomTOCSpider from common.models import LearningContentItem, Competency from common.utils.errors import ResourceNotFoundException from .parsers.openstax.openstax_spider import parse_content from .parsers.custom.custom_pdf_parser import CustomPdfParser from .import_learning_content import create_learning_content_collections, \ create_clustering_learning_content_collections from .parsers.custom.custom_html_paragraph_parser import CustomHtmlParser from .clustering.hierarchical_clustering import create_recursive_topic_tree from .parsers.custom.custom_pdf_paragraph_parser import CustomPdfParagraphParser from utils.paginator import pagination from utils.exception_handlers import LearningContentNotFound # pylint: disable=protected-access # pylint: disable=redefined-builtin # pylint: disable=raise-missing-from # pylint: disable=unspecified-encoding # pylint: disable=broad-exception-raised def clean_temp_folders(): """deletes all tmp files""" try: shutil.rmtree(".tmp") except FileNotFoundError: pass try: shutil.rmtree("data") except FileNotFoundError: pass async def create_learning_content_using_clustering(request_body): """creates learning_content using clustering""" learning_content_url = request_body["gcs_path"] learning_content_title = request_body["title"] start_page = request_body.get("start_page", 1) end_page = request_body.get("end_page", None) description = request_body.get("description", "") created_by = request_body.get("created_by", "") last_modified_by = request_body.get("last_modified_by", "") doc_format = request_body["format"] create_learning_units = request_body.get("create_learning_units", True) create_triples = request_body.get("create_triples", False) course_category = request_body.get("course_category", "") if doc_format.lower() in ["pdf", "epub"]: file_path = download_file_from_gcs(gcs_path=learning_content_url) paragraphs = parse_pdf_paragraph( file_path=file_path, start_page=start_page, end_page=end_page) elif doc_format.lower() == "html": paragraphs = parse_html_paragraphs(learning_content_url) else: raise Exception("Unsupported document format") titles_flag = request_body.get("titles_needed", True) course = await create_recursive_topic_tree( paragraphs, node_level="course", titles_flag=titles_flag, create_learning_units=create_learning_units, create_triples=create_triples) if not os.path.exists(".tmp"): os.makedirs(".tmp") with open(".tmp/topic_modelling_output_reduced_dim.json", "w") as fp: json_string = json.dumps(str(course), indent=2) fp.write(json_string) created_learning_content_id = create_clustering_learning_content_collections( learning_content_title, description, learning_content_url, start_page, end_page, doc_format.lower(), ".tmp/topic_modelling_output_reduced_dim.json", last_modified_by, created_by, create_learning_units, create_triples, course_category) clean_temp_folders() return get_learning_content(created_learning_content_id) async def create_learning_content(request_body): """root function which integrates parser and content tree creation""" using_parser = request_body.get("using_parser", False) try: if using_parser: return create_learning_content_using_parser(request_body) else: return await create_learning_content_using_clustering(request_body) finally: clean_temp_folders() def create_learning_content_using_parser(request_body): """creates learning content using only parser""" learning_content_url = request_body["gcs_path"] learning_content_title = request_body["title"] doc_format = request_body["format"] start_page = request_body.get("start_page", 1) end_page = request_body.get("end_page", None) created_by = request_body.get("created_by", "") last_modified_by = request_body.get("last_modified_by", "") toc_output_json = ".tmp/toc.jl" competency_output_json = ".tmp/output.json" doc_type = request_body.get("doc_type", "custom") course_category = request_body.get("course_category", "") if doc_format.lower() == "pdf": file_path = download_file_from_gcs(gcs_path=learning_content_url) parse_pdf( url=file_path, output_file=competency_output_json, doc_type=doc_type) elif doc_format.lower() == "html": ## TODO download files from gcs. Do we need to download an URL from GCS? parse_html( url=learning_content_url, output_file=competency_output_json, output_toc_file=toc_output_json, doc_type=doc_type) else: raise Exception("Unsupported document format") content_id = create_learning_content_collections(learning_content_title, learning_content_url, doc_format.lower(), start_page, end_page, competency_output_json, last_modified_by, created_by, course_category) clean_temp_folders() return get_learning_content(content_id) def parse_pdf_paragraph(file_path, start_page=1, end_page=None): """extracts all paragraphs for a pdf file""" paragraph_parser = CustomPdfParagraphParser( path=file_path, start_page=start_page, end_page=end_page) return paragraph_parser.get_paragraphs() def parse_html_paragraphs(url): """extracts all paragraphs for a html file""" paragraph_parser = CustomHtmlParser(url) return paragraph_parser.get_paragraphs() def parse_pdf(url, output_file, doc_type): """handles parsing of pdf doc - used in parser alone content creation""" if doc_type == "custom": pdf_parser = CustomPdfParser(path=url, output_filename=output_file) pdf_parser.get_result() else: raise Exception("Pdf parser for Openstax will be released in next release") def parse_html(url, output_file, output_toc_file, doc_type): """handles parsing of static and dynamic html files - used in parser alone content creation""" if doc_type == "custom": toc_parser = CustomTOCSpider(url=url, output_filename=output_toc_file) toc_parser.get_toc() content_parser = CustomSpider( url=url, toc_file=output_toc_file, output_filename=output_file) content_parser.parse_homepage() else: parse_content(url, output_toc_file, output_file) def get_all_learning_contents(): """returns all the learning_contents""" learning_content_list = [] learning_contents = LearningContentItem.collection.fetch() if learning_contents: try: for learning_content in learning_contents: learning_content_dict = ( learning_content.get_fields(reformat_datetime=True)) learning_content_dict["id"] = learning_content.id competency_list = [] learning_content.load_children() for competency in learning_content.competencies: competency_item = competency.get_fields(reformat_datetime=True) competency_item["id"] = competency.id competency_list.append(competency_item) learning_content_dict["competencies"] = competency_list learning_content_dict.pop("competency_ids", None) learning_content_list.append(learning_content_dict) return learning_content_list except (TypeError, KeyError) as e: raise Exception("Failed to fetch all learning_contents") from e else: raise ResourceNotFoundException("No learning_contents found") def get_learning_contents(skip: int, limit: int, sort_by: str, order_by: str, search_query: str) -> Union[Any, str]: """ Function for get all learning contents :param skip: int :param limit: int :param sort_by: str :param order_by: str :param search_query: str :return: dict """ l_contents = [] ord_by = order_by if sort_by == "ascending" and order_by != "title": ord_by = order_by elif sort_by == "descending" and order_by != "title": ord_by = "-{}".format(order_by) try: contents = LearningContentItem.collection.order(ord_by).fetch() if contents: for content in contents: data = content.get_fields(reformat_datetime=True) data["content_id"] = content.id l_contents.append(data) if search_query is not None: l_contents = [content for content in l_contents if search_query.lower() in content["title"].lower()] if sort_by == "ascending" and order_by == "title": l_contents = sorted(l_contents, key=lambda i: i["title"].lower()) elif sort_by == "descending" and order_by == "title": l_contents = sorted(l_contents, key=lambda i: i["title"].lower(), reverse=True) if skip >= 0 and limit > 0: result = pagination(payload=l_contents, skip=skip, limit=limit) res = { "data": result, "total_rec": len(l_contents) } elif skip < 0 or limit < 0: raise Exception("The skip and limit value should be a positive number") elif skip == 0 and limit == 0: return l_contents else: res = { "data": l_contents, "total_rec": len(l_contents) } return res else: raise ResourceNotFoundException("No learning_contents found") except Exception as e: Logger.error("No Learning content found: {}".format(e)) return False def get_learning_content(id): """returns learning_content details for a given learning_content id""" learning_content = LearningContentItem.find_by_id(id) try: learning_content_dict = ( learning_content.get_fields(reformat_datetime=True)) learning_content_dict["id"] = learning_content.id learning_content.load_children() competency_list = [] for competency in learning_content.competencies: competency_item = competency.get_fields(reformat_datetime=True) competency_item["id"] = competency.id competency_list.append(competency_item) learning_content_dict["competencies"] = competency_list learning_content_dict.pop("competency_ids", None) return learning_content_dict except (TypeError, KeyError) as e: raise Exception("Failed to fetch the learning_content") from e def validate_new_competencies(competency_ids): """Validates competency ids if they exist or not from the competency ids list""" for id in competency_ids: Competency.find_by_id(id) def update_learning_content(id, content_request): """updates a given learning_content""" learning_content = LearningContentItem.find_by_id(id) try: learning_content_fields = learning_content.get_fields() for key, value in content_request.items(): if key != "competency_ids": learning_content_fields[key] = value elif key == "competency_ids": validate_new_competencies(value) learning_content_fields[key] = list(set(value)) for key, value in learning_content_fields.items(): setattr(learning_content, key, value) learning_content.update() return get_learning_content(id) except (TypeError, KeyError) as e: raise Exception("Failed to update learning_content") from e def delete_learning_content(content_id: str) -> None: """ Function to Unlink all competency from the LC and Delete that LC Parameters ---------- content_id: str Return ------ None """ try: content = LearningContentItem.find_by_id(content_id) if len(content.competency_ids) != 0: for comp_id in content.competency_ids: content.competency_ids.remove(comp_id) content.save() content.delete_by_id(content_id) else: content.delete_by_id(content_id) except ResourceNotFoundException as e: Logger.error(f"Learning content not found: {e}") raise LearningContentNotFound("Learning Content Is Not Found") def add_competencies(id, request_body): """Updates and adds new competencies ids to learning content""" learning_content = LearningContentItem.find_by_id(id) try: learning_content_fields = learning_content.get_fields() for key, value in request_body.items(): if key != "competency_ids": learning_content_fields[key] = value for key, value in learning_content_fields.items(): setattr(learning_content, key, value) new_competency_ids = request_body.get("competency_ids", []) if new_competency_ids: existing_comp_ids = learning_content_fields["competency_ids"] new_competency_ids = set(new_competency_ids) - set(existing_comp_ids) new_competency_ids = list(new_competency_ids) validate_new_competencies(new_competency_ids) learning_content_fields["competency_ids"].extend(new_competency_ids) setattr(learning_content, "competency_ids", learning_content_fields["competency_ids"]) learning_content.update() return get_learning_content(id) except (TypeError, KeyError) as e: raise Exception("Failed to update the learning content") from e