microservices/skill_service/src/routes/role_to_skills.py (142 lines of code) (raw):
""" Route module for parsing skills from roles """
import traceback
from fastapi import APIRouter
from schemas.skill_parsing_schema import (SkillParsingByQueryRequestModel,
SkillParsingByQueryResponseModel,
SkillParsingByIdRequestModel,
SkillParsingByIdResponseModel,
AlignAllRequestModel)
from schemas.initaiate_batchjob_schema import BatchJobModel
from schemas.error_schema import (NotFoundErrorResponseModel,
ConflictResponseModel)
from services.skill_parsing.skill_parsing import SkillParser
from services.data_source import get_data_sources
from services.batch_job import initiate_batch_job
from common.models import EmploymentRole
from common.utils.logging_handler import Logger
from common.utils.errors import (ValidationError, ResourceNotFoundException,
ConflictError)
from common.utils.http_exceptions import (InternalServerError, BadRequest,
ResourceNotFound, Conflict)
from config import (DATABASE_PREFIX, ERROR_RESPONSES,
ROLE_SKILL_MAPPING_JOB_TYPE)
# pylint: disable = broad-exception-raised,consider-using-f-string,invalid-name
router = APIRouter(
prefix="",
tags=["Skill Parsing from EmploymentRoles"],
responses=ERROR_RESPONSES)
@router.post(
"/role/skill-alignment/query",
response_model=SkillParsingByQueryResponseModel,
responses={404: {
"model": NotFoundErrorResponseModel
}})
def search_by_query(req_body: SkillParsingByQueryRequestModel):
"""Given a query this function return skills that are most relevant to
the given role description
Args:
req_body (SkillParsingByQueryRequestModel): Required body for skill parsing
Raises:
JSONResponse: 500 Internal Server Error if something fails
Returns:
[JSON] (SkillParsingByQueryRequestModel): list of dicts containing
skills with their name, description and score
"""
try:
request_body = req_body.__dict__
alignment_sources = request_body.get("skill_alignment_sources", ["snhu"])
ALLOWED_SKILL_SOURCES = get_data_sources("skill")[0]["source"]
skill_dict = {}
for alignment_source in alignment_sources:
if alignment_source not in ALLOWED_SKILL_SOURCES:
raise Exception("{0} not a valid skill source. Allowed "
"\"skill_sources\" are {1}.".format(
alignment_source, ALLOWED_SKILL_SOURCES))
db_index = "skill" + "_" + alignment_source
skill_parse_obj = SkillParser(source=alignment_source, db_index=db_index)
skill_list = skill_parse_obj.get_relevant_skills(request_body)
skill_dict[alignment_source] = skill_list
response = {
"name": request_body["name"],
"description": request_body["description"],
"aligned_skills": skill_dict
}
return {"data": response}
except ResourceNotFoundException as e:
raise ResourceNotFound(str(e)) from e
except ValidationError as e:
raise BadRequest(str(e)) from e
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise InternalServerError(str(e)) from e
@router.post(
"/role/skill-alignment/id",
response_model=SkillParsingByIdResponseModel,
responses={404: {
"model": NotFoundErrorResponseModel
}})
def align_skill_by_ids(req_body: SkillParsingByIdRequestModel):
"""Given firestore role_ids, this function returns skill candiates for
mapping
Args:
req_body (SkillParsingByIdRequestModel): Required body for Skill Parsing
Raises:
JSONResponse: 500 Internal Server Error if something fails
Returns:
[JSON] (SkillParsingByIdResponseModel): list of top_k skills for each
type of queried sources per ids
"""
try:
request_body = req_body.__dict__
aligned_skills = {}
ALLOWED_SKILL_SOURCES = get_data_sources("skill")[0]["source"]
# Creating empty reponse object
for role_id in request_body["ids"]:
aligned_skills[role_id] = {}
for source in request_body["skill_alignment_sources"]:
aligned_skills[role_id][source] = []
for source in request_body["skill_alignment_sources"]:
if source not in ALLOWED_SKILL_SOURCES:
raise Exception("{0} not a valid skill source. Allowed "
"\"skill_sources\" are {1}.".format(
source, ALLOWED_SKILL_SOURCES))
db_index = "skill" + "_" + source
skill_parse_obj = SkillParser(source=source, db_index=db_index)
skills_list = skill_parse_obj.\
parse_skills_by_role_ids(request_body, update_flag=False)
Logger.info("skills here")
Logger.info(skills_list)
for r_id, skills in skills_list.items():
aligned_skills[r_id][source].extend(skills)
aligned_skill_dict = {"aligned_skills": aligned_skills}
Logger.info(aligned_skill_dict)
response = {"data": aligned_skill_dict}
return response
except ResourceNotFoundException as e:
raise ResourceNotFound(str(e)) from e
except ValidationError as e:
raise BadRequest(str(e)) from e
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise InternalServerError(str(e)) from e
@router.post(
"/role/skill-alignment/batch",
response_model=BatchJobModel,
responses={409: {
"model": ConflictResponseModel
}})
def batch_align_skill_by_ids(req_body: AlignAllRequestModel):
"""Given an alignment source, this function will
run a batch job to align all roles with skills
Args:
req_body (AlignAllRequestModel): Required body of
EmploymentRole to Skill Alignment
Raises:
JSONResponse: 500 Internal Server Error if something fails
Returns: (BatchJobModel)
job_name: name of the batchjob created
status: status of batchjob
"""
try:
request_body = req_body.__dict__
ALLOWED_SKILL_SOURCES = get_data_sources("skill")[0]["source"]
ALLOWED_ROLE_SOURCES = get_data_sources("role")[0]["source"]
if request_body["ids"]:
for id_ in request_body["ids"]:
EmploymentRole.find_by_id(id_)
role_sources = request_body["source_name"]
if role_sources:
for source in role_sources:
if source not in ALLOWED_ROLE_SOURCES:
raise Exception("{0} not a valid role source. Allowed "
"\"role_sources\" are {1}.".format(
source, ALLOWED_ROLE_SOURCES))
if "skill_alignment_sources" not in request_body:
request_body["skill_alignment_sources"] = ALLOWED_SKILL_SOURCES
else:
for source in request_body["skill_alignment_sources"]:
if source not in ALLOWED_SKILL_SOURCES:
raise Exception("{0} not a valid skill source. Allowed "
"\"skill_sources\" are {1}.".format(
source, ALLOWED_SKILL_SOURCES))
env_vars = {"DATABASE_PREFIX": DATABASE_PREFIX}
response = initiate_batch_job(request_body, ROLE_SKILL_MAPPING_JOB_TYPE,
env_vars)
Logger.info(response)
return response
except ConflictError as e:
raise Conflict(str(e)) from e
except ResourceNotFoundException as e:
raise ResourceNotFound(str(e)) from e
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise InternalServerError(str(e)) from e