in chunking/chunkers/spreadsheet_chunker.py [0:0]
def get_chunks(self):
"""
Splits the spreadsheet content into smaller chunks. Depending on the configuration, chunks can be created by sheet or by row.
- If chunking by sheet, the method summarizes content that exceeds the maximum chunk size.
- If chunking by row, each row is processed into its own chunk, optionally including the header row.
Returns:
List[dict]: A list of dictionaries representing the chunks created from the spreadsheet.
"""
chunks = []
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks] Running get_chunks.")
total_start_time = time.time()
sheets = self._spreadsheet_process()
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks] Workbook has {len(sheets)} sheets")
chunk_id = 0
for sheet in sheets:
if not self.chunking_by_row:
# Original behavior: Chunk per sheet
start_time = time.time()
chunk_id += 1
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Starting processing chunk {chunk_id} (sheet).")
table_content = sheet["table"]
table_content = self._clean_markdown_table(table_content)
table_tokens = self.token_estimator.estimate_tokens(table_content)
if self.max_chunk_size > 0 and table_tokens > self.max_chunk_size:
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Table has {table_tokens} tokens. Max tokens is {self.max_chunk_size}. Using summary.")
table_content = sheet["summary"]
chunk_dict = self._create_chunk(
chunk_id=chunk_id,
content=table_content,
summary=sheet["summary"] if not self.chunking_by_row else "",
embedding_text=sheet["summary"] if (sheet["summary"] and not self.chunking_by_row) else table_content,
title=sheet["name"]
)
chunks.append(chunk_dict)
elapsed_time = time.time() - start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Processed chunk {chunk_id} in {elapsed_time:.2f} seconds.")
else:
# New behavior: Chunk per row
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Starting row-wise chunking.")
headers = sheet.get("headers", [])
rows = sheet.get("data", [])
for row_index, row in enumerate(rows, start=1):
if not any(cell.strip() for cell in row):
continue
chunk_id += 1
start_time = time.time()
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Processing chunk {chunk_id} for row {row_index}.")
if self.include_header_in_chunks:
table = tabulate([headers, row], headers="firstrow", tablefmt="github")
else:
table = tabulate([row], headers=headers, tablefmt="github")
table = self._clean_markdown_table(table)
summary = ""
table_tokens = self.token_estimator.estimate_tokens(table)
if self.max_chunk_size > 0 and table_tokens > self.max_chunk_size:
logging.info(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Row table has {table_tokens} tokens. Max tokens is {self.max_chunk_size}. Truncating content.")
content = table
embedding_text = table
else:
content = table
embedding_text = table
chunk_dict = self._create_chunk(
chunk_id=chunk_id,
content=content,
summary=summary,
embedding_text=embedding_text,
title=f"{sheet['name']} - Row {row_index}"
)
chunks.append(chunk_dict)
elapsed_time = time.time() - start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks][{sheet['name']}] Processed chunk {chunk_id} in {elapsed_time:.2f} seconds.")
total_elapsed_time = time.time() - total_start_time
logging.debug(f"[spreadsheet_chunker][{self.filename}][get_chunks] Finished get_chunks. Created {len(chunks)} chunks in {total_elapsed_time:.2f} seconds.")
return chunks