in nl2sql/tasks/join_selection/core.py [0:0]
def __call__(self, db: Database, question: str) -> CoreJoinSelectorResult:
"""
Runs the Join Selection pipeline
"""
logger.info(f"Running {self.tasktype} ...")
prompt_params = {
"question": question,
"query": question,
"thoughts": [],
"answer": None,
"db_descriptor": {db.name: db.descriptor},
"table_name": ", ".join(db.db._usable_tables),
"table_names": list(db.db._usable_tables),
}
prepared_prompt = self.prompt.prompt_template.format(
**{
k: v
for k, v in prompt_params.items()
if k in self.prompt.prompt_template.input_variables
}
)
llm_response = self.llm.generate([prepared_prompt])
logger.debug(
f"[{self.tasktype}] : Received LLM Response : {llm_response.json()}"
)
try:
raw_response = llm_response.generations[0][0].text.strip()
except IndexError as exc:
raise ValueError(
f"Empty / Invalid Response received from LLM : {llm_response.json()}"
) from exc
parsed_response = (
self.prompt.parser.parse(raw_response)
if self.prompt.parser
else raw_response
)
processed_response = self.prompt.post_processor(parsed_response)
intermediate_steps = [
{
"tasktype": self.tasktype,
"prepared_prompt": prepared_prompt,
"llm_response": llm_response.dict(),
"raw_response": raw_response,
"parsed_response": parsed_response,
"processed_response": processed_response,
}
]
allowed_joins = {
f"{tname}.{fk.parent.name}={getattr(fk, '_colspec')}"
for tname, tobj in db.db._metadata.tables.items()
for fk in tobj.foreign_keys
if fk.parent is not None
}
allowed_joins_lower_map = {i.lower(): i for i in allowed_joins}
selected_joins = {allowed_joins_lower_map.get(i, i) for i in processed_response}
if not selected_joins:
logger.critical("No Join Selected!")
return CoreJoinSelectorResult(
db_name=db.name,
question=question,
allowed_joins=allowed_joins,
selected_joins=selected_joins,
intermediate_steps=intermediate_steps,
)