def __call__()

in nl2sql_library/nl2sql/tasks/column_selection/core.py [0:0]


    def __call__(self, db: Database, question: str) -> CoreColumnSelectorResult:
        """
        Runs the Column Selection pipeline
        """
        logger.info(f"Running {self.tasktype} ...")
        selected_columns = []
        intermediate_steps = []

        for tablename, tabledescriptor in db.descriptor.items():
            prompt_params = {
                "question": question,
                "query": question,
                "thoughts": [],
                "answer": None,
                "db_descriptor": {db.name: {tablename: tabledescriptor}},
                "table_name": tablename,
                "table_names": list(db.db._usable_tables),
            }
            prepared_prompt = self.prompt.prompt_template.format(
                **{
                    k: v
                    for k, v in prompt_params.items()
                    if k in self.prompt.prompt_template.input_variables
                }
            )
            llm_response = self.llm.generate([prepared_prompt])
            logger.debug(
                f"[{self.tasktype}] : Received LLM Response : {llm_response.json()}"
            )
            try:
                raw_response = llm_response.generations[0][0].text.strip()
            except IndexError as exc:
                raise ValueError(
                    f"Empty / Invalid Response received from LLM : {llm_response.json()}"
                ) from exc

            parsed_response = (
                self.prompt.parser.parse(raw_response)
                if self.prompt.parser
                else raw_response
            )
            processed_response = self.prompt.post_processor(parsed_response)
            intermediate_steps.append(
                {
                    "tasktype": self.tasktype,
                    "table": tablename,
                    "prepared_prompt": prepared_prompt,
                    "llm_response": llm_response.dict(),
                    "raw_response": raw_response,
                    "parsed_response": parsed_response,
                    "processed_response": processed_response,
                }
            )
            selected_columns.extend(processed_response)

        available_columns = {
            f"{tabname}.{colname}"
            for tabname, tabdesc in db.descriptor.items()
            for colname in tabdesc["col_descriptor"].keys()
        }
        available_columns_lower_map = {i.lower(): i for i in available_columns}
        filtered_selected_columns: set[str] = {
            available_columns_lower_map[c.lower()]
            for c in selected_columns
            if c.lower() in available_columns_lower_map
        }
        if not filtered_selected_columns:
            logger.critical("No column Selected!")
        return CoreColumnSelectorResult(
            db_name=db.name,
            question=question,
            available_columns=available_columns,
            selected_columns=filtered_selected_columns,
            intermediate_steps=intermediate_steps,
        )