microservices/skill_service/src/services/ingest_osn.py (164 lines of code) (raw):

""" Service for following routes: /import/local-csv /import/gcs-csv """ import csv import asyncio from concurrent.futures import ThreadPoolExecutor from io import StringIO from services.data_source import upsert_data_source_doc from common.models import Skill, Category from common.utils.gcs_adapter import get_blob_from_gcs_path from common.utils.errors import ValidationError # pylint: disable = broad-exception-raised class AsyncInserts: """Class to insert data in async fashion with insert_data() function""" def __init__(self): self.inserted_skills = [] self.inserted_categories = [] pass def insert_skill(self, skill): obj = Skill() obj = obj.from_dict(skill) obj.uuid = "" obj.save() obj.uuid = obj.id obj.update() self.inserted_skills.append(skill) def insert_category(self, category): obj = Category() obj = obj.from_dict(category) obj.uuid = "" obj.save() obj.uuid = obj.id obj.update() self.inserted_categories.append(category) async def start_insert_process(self, data, obj_type): with ThreadPoolExecutor(max_workers=30) as executor: loop = asyncio.get_event_loop() tasks = [] for data_obj in data: if obj_type == "skill": runner = loop.run_in_executor(executor, self.insert_skill, data_obj) elif obj_type == "category": runner = loop.run_in_executor(executor, self.insert_category, data_obj) tasks.append(runner) for _ in await asyncio.gather(*tasks): pass def insert_data(self, data, data_type): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) future = asyncio.ensure_future(self.start_insert_process(data, data_type)) loop.run_until_complete(future) def get_inserted_skills_count(self): return len(self.inserted_skills) def get_inserted_categories_count(self): return len(self.inserted_categories) def ingest_osn_csv(path_dict): """validates, transforms and saves competencies and skills and category json as skill tree""" osn_uri = path_dict.get("osn_uri") osn_json_array = parse_and_validate_osn_csv(osn_uri) skills_count = import_skills(osn_json_array) _ = upsert_data_source_doc("skill", "osn") msg = f"Imported {skills_count} skills" response = {"success": True, "message": msg, "data": {}} return response def parse_and_validate_osn_csv(osn_uri): blob = get_blob_from_gcs_path(osn_uri) osn_json_array = parse_gcs_csv_file(blob) try: validate_osn_csv(osn_json_array) except Exception as e: raise Exception(str(e)) from e return osn_json_array def parse_gcs_csv_file(blob): """downloads csv from gcs and returns it as json""" # download file as bytes file_data = blob.download_as_bytes() byte_content = file_data content = byte_content.decode() file = StringIO(content) csv_reader = csv.DictReader(file, delimiter=",") return list(csv_reader) def validate_osn_csv(data): """checks if all required columns are present in osn csv""" fields = [ "Canonical URL", "RSD Name", "Author", "Skill Statement", "Category", "Keywords", "Certifications", "Occupation Major Groups", "Occupation Minor Groups", "Broad Occupations", "Detailed Occupations", "O*Net Job Codes", "Alignment Name", "Alignment URL" ] missing_fields = [] for field in fields: if field not in data[0]: missing_fields.append(field) if len(missing_fields) > 0: fields = ", ".join(missing_fields) raise ValidationError\ (f"Following fields are missing in provided csv: '{fields}'") def import_skills(osn_json_array): """extracts and saves the skills as per skill fireo model""" inserted_skills = [] for osn_skill in osn_json_array: skill_id = osn_skill.get("Canonical URL", "").split("/")[-1] aligned_id = osn_skill.get("Alignment URL", "").split("/")[-1] occ_major_group = osn_skill.get("Occupation Major Groups").split(";") if len(occ_major_group) == 1 and occ_major_group[0] == "": occ_major_group = [] occ_major_group = [i.strip() for i in occ_major_group] occ_minor_group = osn_skill.get("Occupation Minor Groups").split(";") if len(occ_minor_group) == 1 and occ_minor_group[0] == "": occ_minor_group = [] occ_minor_group = [i.strip() for i in occ_minor_group] broad_occupation = osn_skill.get("Broad Occupations").split(";") if len(broad_occupation) == 1 and broad_occupation[0] == "": broad_occupation = [] broad_occupation = [i.strip() for i in broad_occupation] detailed_occupation = osn_skill.get("Detailed Occupations").split(";") if len(detailed_occupation) == 1 and detailed_occupation[0] == "": detailed_occupation = [] detailed_occupation = [i.strip() for i in detailed_occupation] keywords = osn_skill.get("Keywords").split(";") if len(keywords) == 1 and keywords[0] == "": keywords = [] keywords = [i.strip() for i in keywords] certifications = osn_skill.get("Certifications").split(";") if len(certifications) == 1 and certifications[0] == "": certifications = [] certifications = [i.strip() for i in certifications] skill = { "name": osn_skill.get("RSD Name"), "description": osn_skill.get("Skill Statement"), "parent_nodes": {"competencies": []}, "reference_id": skill_id, "keywords": keywords, "occupations": { "occupations_major_group": occ_major_group, "occupations_minor_group": occ_minor_group, "broad_occupation": broad_occupation, "detailed_occupation": detailed_occupation }, "onet_job": osn_skill.get("O*Net Job Codes"), "alignments": { "standard_alignment": {}, "credential_alignment": {}, "skill_alignment": { "emsi": { "aligned": [{ "id": aligned_id, "name": osn_skill.get("Alignment Name"), "score": 1.0 }], "suggested": [] } }, "knowledge_alignment": {}, "role_alignment": {}, "organizational_alignment": {} }, "author": osn_skill.get("Author"), "creator": "", "organizations": [], "certifications": certifications, "type": { "id": "", "name": "" }, "source_uri": osn_skill.get("Canonical URL"), "source_name": "osn" } inserted_skills.append(skill) async_inserts = AsyncInserts() async_inserts.insert_data(inserted_skills, "skill") inserted_skills_length = async_inserts.get_inserted_skills_count() return inserted_skills_length