in yourbench/pipeline/chunking.py [0:0]
def run(config: dict[str, Any]) -> None:
"""
Main entry point for the chunking pipeline stage.
Args:
config: Pipeline configuration dictionary
"""
chunking_config = config.get("pipeline", {}).get("chunking", {})
if not chunking_config.get("run", False):
logger.info("Chunking stage is disabled. Skipping.")
return
logger.info("Starting chunking stage...")
# Load dataset
dataset = custom_load_dataset(config=config, subset="summarized")
logger.info(f"Loaded {len(dataset)} documents for chunking")
# Extract configuration
params = extract_config(config)
# Process documents
all_single_chunks: list[list[SingleHopChunk]] = []
all_multihop_chunks: list[list[MultiHopChunk]] = []
start_time = time.time()
for idx, row in enumerate(tqdm(dataset, desc="Chunking documents")):
doc_text = row.get("document_text", "")
doc_id = row.get("document_id", f"doc_{idx}")
# Create single-hop chunks
single_chunks = chunk_document(doc_text, doc_id, params.max_tokens)
# Create multi-hop chunks
multihop_chunks = create_multihop_chunks(
single_chunks, params.h_min, params.h_max, params.num_multihops_factor
)
all_single_chunks.append(single_chunks)
all_multihop_chunks.append(multihop_chunks)
# Progress logging
if (idx + 1) % 100 == 0:
elapsed = time.time() - start_time
rate = (idx + 1) / elapsed
logger.info(f"Progress: {idx + 1}/{len(dataset)} docs ({rate:.1f} docs/sec)")
# Add columns to dataset
dataset = dataset.add_column("chunks", [[asdict(chunk) for chunk in chunks] for chunks in all_single_chunks])
dataset = dataset.add_column(
"multihop_chunks", [[asdict(mh) for mh in multihops] for multihops in all_multihop_chunks]
)
# Save dataset
custom_save_dataset(dataset=dataset, config=config, subset="chunked")
elapsed_total = time.time() - start_time
logger.success(f"Chunking completed in {elapsed_total:.1f} seconds")