def run()

in yourbench/pipeline/summarization.py [0:0]


def run(config: dict[str, Any]) -> None:
    """Executes the hierarchical summarization pipeline."""
    stage_cfg = config.get("pipeline", {}).get("summarization", {})
    if not stage_cfg.get("run", False):
        logger.info("Summarization stage disabled – skipping.")
        return

    max_tokens: int = stage_cfg.get("max_tokens", 16384)
    overlap: int = stage_cfg.get("token_overlap", 128)
    encoding_name: str = stage_cfg.get("encoding_name", "cl100k_base")

    logger.info("=== Summarization v2 – map-reduce ===")

    dataset = custom_load_dataset(config=config, subset="ingested")
    if not dataset or len(dataset) == 0:
        logger.warning("Ingested dataset is empty or None – nothing to summarise.")
        return
    logger.info(f"Loaded {len(dataset)} documents for summarization.")

    chunk_calls, call_map = _build_chunk_calls(dataset, max_tokens, overlap, encoding_name)
    chunk_responses_dict = run_inference(config=config, step_name="summarization", inference_calls=chunk_calls)
    model_name, raw_chunk_summaries_by_doc, cleaned_chunk_summaries_by_doc = _collect_chunk_summaries(
        chunk_responses_dict, call_map, len(dataset)
    )

    combine_calls, doc_indices_for_combine = _build_combine_calls(cleaned_chunk_summaries_by_doc)

    raw_combined_summaries: list[str] = []
    if combine_calls:
        combine_responses_dict = run_inference(config=config, step_name="summarization", inference_calls=combine_calls)
        if combine_responses_dict:
            combine_model_name = list(combine_responses_dict.keys())[0]
            if combine_model_name != model_name and model_name:
                logger.warning(f"Different model used in combine stage: {combine_model_name} vs {model_name}")
            raw_combined_summaries = combine_responses_dict.get(combine_model_name, [])
        else:
            raw_combined_summaries = [""] * len(doc_indices_for_combine)

    final_document_summaries: list[str] = [
        summaries[0] if summaries else "" for summaries in cleaned_chunk_summaries_by_doc
    ]

    if combine_calls and raw_combined_summaries:
        final_document_summaries = _merge_final_summaries(
            final_document_summaries, raw_combined_summaries, doc_indices_for_combine
        )

    full_raw_combined_summaries = [""] * len(dataset)
    for i, doc_idx in enumerate(doc_indices_for_combine):
        if i < len(raw_combined_summaries):
            full_raw_combined_summaries[doc_idx] = raw_combined_summaries[i]

    dataset = dataset.add_column("raw_chunk_summaries", raw_chunk_summaries_by_doc)
    dataset = dataset.add_column("chunk_summaries", cleaned_chunk_summaries_by_doc)
    dataset = dataset.add_column("raw_document_summary", full_raw_combined_summaries)
    dataset = dataset.add_column("document_summary", final_document_summaries)
    effective_model_name = model_name if model_name else "unknown"
    dataset = dataset.add_column("summarization_model", [effective_model_name] * len(dataset))

    custom_save_dataset(dataset=dataset, config=config, subset="summarized")
    logger.success(f"Hierarchical summarization completed ({len(dataset)} documents).")