microservices/course_ingestion/services/clustering/hierarchical_clustering.py (551 lines of code) (raw):

"""hierarchical clustering""" from concurrent.futures import ThreadPoolExecutor from sentence_transformers import SentenceTransformer import numpy as np import requests from umap import UMAP from sklearn.cluster import AgglomerativeClustering from sklearn.metrics import silhouette_score import json from config import SERVICES, TITLE_SIMILARITY_CER_THRESHOLD, TITLE_GENERATION_BATCH_SIZE import tornado.ioloop from string import punctuation from tornado.gen import multi from transformers import T5Tokenizer from textacy.extract import keyterms from services.triple_inference import TripleService import spacy import editdistance nlp = spacy.load("en_core_web_sm") triple_service = TripleService() tokenizer = T5Tokenizer.from_pretrained("t5-base") #pylint: disable=consider-using-with,broad-exception-raised title_generation_executor = ThreadPoolExecutor(max_workers=1) summarization_executor = ThreadPoolExecutor(max_workers=8) hierarchical_clustering = AgglomerativeClustering( affinity="cosine", linkage="complete") umap_model = UMAP(random_state=42, n_neighbors=5, n_components=20, min_dist=0.0, metric="cosine", verbose=False) sentence_model = SentenceTransformer("distilbert-base-nli-stsb-mean-tokens") def is_nan(num): """Check if num is empty.""" return num == "" def cer(original, result): r"""The CER is defined as the editing/Levenshtein distance. character level Levenshtein distance divided by the amount of characters in the original text. In case of the original having more charactes (N) than the result and both being totally different (all N characters resulting in 1 edit operation each), the CER will always be 1 (N / N = 1). """ # The WER ist calculated on word (and NOT on character) level. # Therefore we split the strings into words first: if is_nan(result): result = "" original = list(original) result = list(result) return editdistance.eval(original, result) / float(len(original)) def get_blooms_titles(texts, max_title_length, n_titles=1): try: prediction = json.loads( requests.post( url="http://{}:{}/title-generation/api/v1/blooms-title-generation" .format( SERVICES["title-generation"]["host"], # "0.0.0.0", SERVICES["title-generation"]["port"], ), json={ "texts": texts, "max_title_length": max_title_length, "n_titles": n_titles }, ).content)["data"] preds = prediction["titles"] return preds except ConnectionError as e: raise Exception("Failed to connect with title \ generation microservice") from e except (TypeError, KeyError) as e: raise Exception("Internal server error") from e def get_titles(texts, max_title_length, n_titles=1): try: prediction = json.loads( requests.post( url="http://{}:{}/title-generation/api/v1/title-generation".format( SERVICES["title-generation"]["host"], # "0.0.0.0", SERVICES["title-generation"]["port"], ), json={ "texts": texts, "max_title_length": max_title_length, "n_titles": n_titles }, ).content)["data"] preds = prediction["titles"] return preds except ConnectionError as e: raise Exception("failed to connect with \ title generation microservice") from e except (TypeError, KeyError) as e: raise Exception("Internal server error") from e def get_summary(text, ratio=0.3): try: prediction = json.loads( requests.post( url="http://{}:{}/extractive-summarization/api/v1/summarize" .format( SERVICES["extractive-summarization"]["host"], # "0.0.0.0", SERVICES["extractive-summarization"]["port"], ), json={ "data": text, "ratio": ratio }, ).content)["data"] return prediction["summary"] except ConnectionError as e: raise Exception("failed to connect with extractive \ summarization microservice") from e except (TypeError, KeyError) as e: raise Exception("Internal server error") from e def join_texts(texts): if texts: joined_text = texts[0].strip() for text in texts[1:]: if (len(joined_text) > 0) and (joined_text[-1] not in punctuation): joined_text = joined_text + ". " + text else: joined_text = joined_text + " " + text else: joined_text = "" return joined_text async def compress_text_for_title_generation(texts): tokens_count = [len(tokenizer.tokenize(text)) for text in texts] total_tokens = sum(tokens_count) text = "" if total_tokens > 0: sentence_ratios = [(512 / total_tokens) * (n_token / total_tokens) for n_token in tokens_count] if total_tokens > 512: summarised_docs = await multi([ tornado.ioloop.IOLoop.current().run_in_executor( summarization_executor, get_summary, text, ratio) for text, ratio in zip(texts, sentence_ratios) ]) text = join_texts(summarised_docs) else: text = join_texts(texts) return text def get_batches(iterable, n=1): batches = [] l = len(iterable) for ndx in range(0, l, n): batches.append(iterable[ndx:min(ndx + n, l)]) return batches async def get_all_titles(combined_text_list, max_title_length, batch_size=32, blooms_title=False, n_titles=5): #combined_text_list = [item for sublist in texts_list for item in sublist] if blooms_title: all_titles = await multi([ tornado.ioloop.IOLoop.current().run_in_executor( title_generation_executor, get_blooms_titles, batch, max_title_length, n_titles) for batch in get_batches(combined_text_list, batch_size) ]) else: all_titles = await multi([ tornado.ioloop.IOLoop.current().run_in_executor( title_generation_executor, get_titles, batch, max_title_length, n_titles) for batch in get_batches(combined_text_list, batch_size) ]) all_titles = [item for sublist in all_titles for item in sublist] return all_titles def update_titles(topic_tree, titles_dict, level, next_indices): try: if level == "course": for competency, title in zip(topic_tree, titles_dict["competency"][0]): competency["title"] = title for i, competency in enumerate(topic_tree): new_node = update_titles(competency["sub_competencies"], titles_dict, "competency", next_indices) topic_tree[i]["sub_competencies"] = new_node next_indices["comp_ind"] += 1 elif level == "competency": for sub_competency, title in zip( topic_tree, titles_dict["sub_competency"][next_indices["comp_ind"]]): sub_competency["title"] = title for i, sub_comp in enumerate(topic_tree): new_node = update_titles(sub_comp["learning_objectives"], titles_dict, "sub_competency", next_indices) topic_tree[i]["learning_objectives"] = new_node next_indices["subcomp_ind"] += 1 elif level == "sub_competency": for learning_objective, title in zip( topic_tree, titles_dict["learning_objective"][next_indices["subcomp_ind"]]): learning_objective["title"] = title elif level == "learning_objective": for learning_unit, title in zip( topic_tree, titles_dict["learning_unit"][next_indices["lo_ind"]]): learning_unit["title"] = title else: raise Exception("Undefined topic tree level - {}".format(level)) return topic_tree except (KeyError, IndexError) as e: raise Exception("Internal server Error") from e def get_optimum_clusters(model, data, level): sil_score_max = -1 level_map = { "competency": 20, "sub_competency": 10, "learning_objective": 3, "learning_unit": 1 } possible_cluster_count = len(data) // level_map[level] if possible_cluster_count < 2: return [0 for i in range(len(data))] elif possible_cluster_count == 2: model.n_clusters = 2 labels = model.fit_predict(data) return labels else: for n_clusters in range(2, possible_cluster_count): model.n_clusters = n_clusters labels = model.fit_predict(data) sil_score = silhouette_score(data, labels) if sil_score > sil_score_max: sil_score_max = sil_score best_n_clusters = n_clusters model.n_clusters = best_n_clusters clusters = model.fit_predict(data) return clusters def get_recursive_tree(clustering_model, embeddings, node_level, documents, doc_ids, text_list, create_learning_units, create_triples): if node_level == "course": clusters = get_optimum_clusters( clustering_model, embeddings[doc_ids], level="competency") comps = [] for comp_cluster in set(clusters): comp = {} comp["competency"] = comp_cluster comp["document_ids"] = [ doc_id for doc_id, cluster_id in zip(doc_ids, clusters) if cluster_id == comp_cluster ] comp["text"] = join_texts([documents[i] for i in comp["document_ids"]]) comp["title"] = len(text_list) text_list.append({ "docs": [documents[i] for i in comp["document_ids"]], "blooms_title": False }) comp["sub_competencies"], text_list = get_recursive_tree( clustering_model, embeddings, "competency", documents, comp["document_ids"], text_list, create_learning_units, create_triples) comps.append(comp) return comps, text_list elif node_level == "competency": clusters = get_optimum_clusters( clustering_model, embeddings[doc_ids], level="sub_competency") scs = [] for sc_cluster in set(clusters): sc = {} sc["sub_competency"] = sc_cluster sc["document_ids"] = [ doc_id for doc_id, cluster_id in zip(doc_ids, clusters) if cluster_id == sc_cluster ] sc["text"] = join_texts([documents[i] for i in sc["document_ids"]]) sc["title"] = len(text_list) text_list.append({ "docs": [documents[i] for i in sc["document_ids"]], "blooms_title": False }) sc["learning_objectives"], text_list = get_recursive_tree( clustering_model, embeddings, "sub_competency", documents, sc["document_ids"], text_list, create_learning_units, create_triples) scs.append(sc) return scs, text_list elif node_level == "sub_competency": clusters = get_optimum_clusters( clustering_model, embeddings[doc_ids], level="learning_objective") los = [] for lo_cluster in set(clusters): lo = {} lo["learning_objective"] = lo_cluster lo["document_ids"] = [ doc_id for doc_id, cluster_id in zip(doc_ids, clusters) if cluster_id == lo_cluster ] lo["text"] = "<p>".join([documents[i] for i in lo["document_ids"]]) lo["title"] = len(text_list) text_list.append({ "docs": [documents[i] for i in lo["document_ids"]], "blooms_title": True }) if create_learning_units: lo["learning_units"], text_list = get_recursive_tree( clustering_model, embeddings, "learning_objective", documents, lo["document_ids"], text_list, False, create_triples) los.append(lo) return los, text_list elif node_level == "learning_objective": clusters = get_optimum_clusters( clustering_model, embeddings[doc_ids], level="learning_unit") lus = [] for lu_cluster in set(clusters): lu = {} lu["learning_unit"] = lu_cluster lu["document_ids"] = [ doc_id for doc_id, cluster_id in zip(doc_ids, clusters) if cluster_id == lu_cluster ] lu["text"] = "<p>".join([documents[i] for i in lu["document_ids"]]) lu["topics"] = get_topics(lu["text"].replace("<p>", " ")) lu["title"] = len(text_list) text_list.append({ "docs": [documents[i] for i in lu["document_ids"]], "blooms_title": True }) if create_triples: lu["triples"], text_list = get_recursive_tree( clustering_model, embeddings, "learning_unit", documents, lu["document_ids"], text_list, False, create_triples) lus.append(lu) return lus, text_list elif node_level == "learning_unit": triples = [] lu_text_list = [" ".join([documents[i] for i in doc_ids])] triples = triple_service.generate_triples(lu_text_list)[0] return triples, text_list # pylint: disable=broad-except async def create_recursive_topic_tree(documents, node_level="course", titles_flag=True, create_learning_units=True, create_triples=True): embeddings = np.array( sentence_model.encode(documents, show_progress_bar=True)) try: reduced_embeddings = umap_model.fit_transform(embeddings) except Exception: try: umap_model_modified = UMAP(random_state=42, n_neighbors=2, n_components=10, min_dist=0.0, metric="cosine") reduced_embeddings = umap_model_modified.fit_transform(embeddings) except Exception: reduced_embeddings = embeddings topic_tree, text_list = get_recursive_tree( hierarchical_clustering, reduced_embeddings, node_level, documents, range(len(documents)), text_list=[], create_learning_units=create_learning_units, create_triples=create_triples) if titles_flag: text_list_with_summaries = await get_summarized_texts(text_list) titles_list = await generate_titles(text_list_with_summaries, max_length=32) topic_tree = add_titles_to_tree( topic_tree, titles_list, node_level=node_level, create_learning_units=create_learning_units) topic_tree = check_for_duplicate_titles( topic_tree, node_level, create_learning_units) return topic_tree def handle_duplicate_titles_sub_comp(sub_competency): """Check if duplicate titles exist inside a sub competency and if they exist add Part-{X} at the end""" learning_objectives = sub_competency["learning_objectives"] title_mapping = {} for lo_index, learning_objective in enumerate(learning_objectives): learning_units = learning_objective["learning_units"] for lu_index, learning_unit in enumerate(learning_units): lu_title = learning_unit["title"] if lu_title in title_mapping: prev_lo_index, prev_lu_index, count = title_mapping[lu_title] if count > 1: title_mapping[lu_title][-1] += 1 learning_unit["title"] = lu_title + " Part-" + str(count+1) else: title_mapping[lu_title][-1] += 1 learning_objectives[prev_lo_index]["learning_units"]\ [prev_lu_index]["title"] += " Part-" + str(count) learning_unit["title"] += " Part-" + str(count+1) else: title_mapping[lu_title] = [lo_index, lu_index, 1] return sub_competency def check_for_duplicate_titles(topic_tree, node_level, create_learning_units, is_list=True): """Merges the nodes at particular level with same title""" if node_level == "course": updated_topic_tree = [] for competency in topic_tree: competency = check_for_duplicate_titles( competency, "competency", create_learning_units, is_list=False) updated_topic_tree.append(competency) return updated_topic_tree elif node_level == "competency": updated_topic_tree = [] if is_list: sub_competencies = topic_tree else: sub_competencies = topic_tree["sub_competencies"] for sub_competency in sub_competencies: sub_competency = check_for_duplicate_titles( sub_competency, "sub_competency", create_learning_units, is_list=False) sub_competency = handle_duplicate_titles_sub_comp(sub_competency) updated_topic_tree.append(sub_competency) if is_list: return updated_topic_tree else: topic_tree["sub_competencies"] = updated_topic_tree return topic_tree elif node_level == "sub_competency": updated_topic_tree = [] titles_dict = {} if is_list: learning_objectives = topic_tree else: learning_objectives = topic_tree["learning_objectives"] for i, learning_objective in enumerate(learning_objectives): title = learning_objective["title"] if title not in titles_dict: titles_dict[title] = i else: prev_index = titles_dict[title] prev_lo = learning_objectives[prev_index] prev_lo["document_ids"].extend( learning_objective["document_ids"]) prev_lo["text"] = "<p>".join([ prev_lo["text"], learning_objective["text"]]) if "learning_units" in prev_lo: prev_lo["learning_units"].extend( learning_objective["learning_units"] ) for _, value in titles_dict.items(): updated_topic_tree.append(learning_objectives[value]) final_tree = [] if create_learning_units: for learning_objective in updated_topic_tree: learning_objective = check_for_duplicate_titles( learning_objective, "learning_objective", create_learning_units, is_list=False) final_tree.append(learning_objective) updated_topic_tree = final_tree if is_list: return updated_topic_tree else: topic_tree["learning_objectives"] = updated_topic_tree return topic_tree elif node_level == "learning_objective": updated_topic_tree = [] titles_dict = {} if is_list: learning_units = topic_tree else: learning_units = topic_tree["learning_units"] for i, learning_unit in enumerate(learning_units): title = learning_unit["title"] if title not in titles_dict: titles_dict[title] = i else: prev_index = titles_dict[title] prev_lu = learning_units[prev_index] prev_lu["document_ids"].extend( learning_unit["document_ids"]) prev_lu["text"] = "<p>".join([prev_lu["text"], learning_unit["text"]]) if "triples" in prev_lu: prev_lu["triples"].extend( learning_unit["triples"] ) for _, value in titles_dict.items(): updated_topic_tree.append(learning_units[value]) if is_list: return updated_topic_tree else: topic_tree["learning_units"] = updated_topic_tree return topic_tree #pylint: disable=consider-using-set-comprehension def get_topics(text): all_entities = keyterms.textrank( nlp(text), position_bias=False, topn=10, window_size=3, include_pos=("NOUN", "ADJ")) all_entities = [{ "entity": ent[0], "salience": round(ent[1], 3) } for ent in all_entities] return all_entities async def get_summarized_texts(text_list): for i in text_list: i["summarised_text"] = await compress_text_for_title_generation(i["docs"]) return text_list async def generate_titles(text_list_with_summaries, max_length): blooms_indices = [] non_blooms_indices = [] for i in range(len(text_list_with_summaries)): if text_list_with_summaries[i]["blooms_title"]: blooms_indices.append(i) else: non_blooms_indices.append(i) blooms_text_list = [ i["summarised_text"] for i in [text_list_with_summaries[j] for j in blooms_indices] ] non_blooms_text_list = [ i["summarised_text"] for i in [text_list_with_summaries[j] for j in non_blooms_indices] ] blooms_titles = await get_all_titles( blooms_text_list, max_title_length=max_length, batch_size=TITLE_GENERATION_BATCH_SIZE, blooms_title=True, n_titles=5) non_blooms_titles = await get_all_titles( non_blooms_text_list, max_title_length=max_length, batch_size=TITLE_GENERATION_BATCH_SIZE, blooms_title=False, n_titles=5) for i, j in zip(blooms_indices, blooms_titles): text_list_with_summaries[i]["title"] = j for i, j in zip(non_blooms_indices, non_blooms_titles): text_list_with_summaries[i]["title"] = j return [i["title"] for i in text_list_with_summaries] def get_filtered_title(parent_title, candidate_titles): if parent_title: cers = [cer(parent_title, candidate_title) \ for candidate_title in candidate_titles] for i, cer_value in enumerate(cers): if cer_value > TITLE_SIMILARITY_CER_THRESHOLD: return candidate_titles[i] return candidate_titles[np.argmax(cers)] else: return candidate_titles[0] def add_titles_to_tree(topic_tree, titles_list, parent_title = None, node_level="course", create_learning_units=True): if node_level == "course": for i in topic_tree: i["title"] = titles_list[i["title"]][0] i["sub_competencies"] = add_titles_to_tree( i["sub_competencies"], titles_list, parent_title = i["title"], node_level="competency", create_learning_units=create_learning_units) return topic_tree elif node_level == "competency": for i in topic_tree: i["title"] = get_filtered_title(parent_title, titles_list[i["title"]]) i["learning_objectives"] = add_titles_to_tree( i["learning_objectives"], titles_list, node_level="sub_competency", create_learning_units=create_learning_units) return topic_tree elif node_level == "sub_competency": for i in topic_tree: i["title"] = titles_list[i["title"]][0] if create_learning_units: i["learning_units"] = add_titles_to_tree( i["learning_units"], titles_list, parent_title = i["title"], node_level="learning_objective") return topic_tree elif node_level == "learning_objective": for i in topic_tree: i["title"] = get_filtered_title(parent_title, titles_list[i["title"]]) return topic_tree