in chunking/chunkers/spreadsheet_chunker.py [0:0]
def _spreadsheet_process(self):
"""
Extracts and processes each sheet from the spreadsheet, converting the content into Markdown table format.
If chunking by sheet, a summary is generated if the sheet's content exceeds the maximum token size.
Returns:
List[dict]: A list of dictionaries, where each dictionary contains sheet metadata, headers, rows, table content, and a summary if applicable.
"""
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process] Starting blob download.")
blob_data = self.document_bytes
blob_stream = BytesIO(blob_data)
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process] Starting openpyxl load_workbook.")
workbook = load_workbook(blob_stream, data_only=True)
sheets = []
total_start_time = time.time()
for sheet_name in workbook.sheetnames:
logging.info(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_name}] Started processing.")
start_time = time.time()
sheet_dict = {}
sheet_dict['name'] = sheet_name
sheet = workbook[sheet_name]
data, headers = self._get_sheet_data(sheet)
sheet_dict["headers"] = headers
sheet_dict["data"] = data
table = tabulate(data, headers=headers, tablefmt="grid")
table = self._clean_markdown_table(table)
sheet_dict["table"] = table
if not self.chunking_by_row:
prompt = f"Summarize the table with data in it, by understanding the information clearly.\n table_data:{table}"
summary = self.aoai_client.get_completion(prompt, max_tokens=2048)
sheet_dict["summary"] = summary
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_dict['name']}] Generated summary.")
else:
sheet_dict["summary"] = ""
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_dict['name']}] Skipped summary generation (chunking by row).")
elapsed_time = time.time() - start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process][{sheet_dict['name']}] Processed in {elapsed_time:.2f} seconds.")
sheets.append(sheet_dict)
total_elapsed_time = time.time() - total_start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][spreadsheet_process] Total processing time: {total_elapsed_time:.2f} seconds.")
return sheets