def run()

in yourbench/pipeline/lighteval.py [0:0]


def run(config: Dict[str, Any]) -> None:
    """
    Main entry point for the lighteval pipeline stage.

    This stage merges single-shot and multi-hop question datasets with chunked
    document metadata into a unified "light evaluation" dataset containing the columns:

      1. question
      2. ground_truth_answer
      3. question_category
      4. kind
      5. estimated_difficulty
      6. citations
      7. document_id
      8. chunk_ids
      9. question_generating_model
      10. chunks
      11. document

    The result is saved under the subset name specified in config["pipeline"]["lighteval"]["output_subset"].

    Args:
        config (Dict[str, Any]):
            The entire pipeline configuration. Must have the following fields:
            - config["pipeline"]["lighteval"]["run"] (bool): Whether to run this stage.
            - config["pipeline"]["lighteval"]["single_shot_subset"] (str): Subset containing single-shot questions.
            - config["pipeline"]["lighteval"]["multi_hop_subset"]   (str): Subset containing multi-hop questions.
            - config["pipeline"]["lighteval"]["chunked_subset"]     (str): Subset containing chunked documents.
            - config["pipeline"]["lighteval"]["output_subset"]      (str): Subset name for saving final dataset.

    Returns:
        None. The merged dataset is saved to disk or HF Hub as configured.
    """
    stage_cfg = config.get("pipeline", {}).get("lighteval", {})
    if not stage_cfg.get("run", False):
        logger.info("lighteval stage is disabled. Skipping.")
        return

    logger.info("Saving lighteval compatible dataset")

    # Use configurable subset names with fallbacks
    single_shot_subset = stage_cfg.get("single_shot_subset", "single_shot_questions")
    multi_hop_subset = stage_cfg.get("multi_hop_subset", "multi_hop_questions")
    cross_doc_subset = stage_cfg.get("cross_doc_subset", "cross_document_questions")
    chunked_subset = stage_cfg.get("chunked_subset", "chunked")
    summarized_subset = stage_cfg.get("summarized_subset", "summarized")
    output_subset = stage_cfg.get("output_subset", "lighteval")

    # Load datasets
    try:
        single_shot_ds = custom_load_dataset(config=config, subset=single_shot_subset)
        logger.info(f"Loaded single-shot Q subset with {len(single_shot_ds)} rows.")
    except Exception as e:
        logger.warning(f"Could not load single-shot subset: {e}")
        single_shot_ds = Dataset.from_dict({})

    try:
        multi_hop_ds = custom_load_dataset(config=config, subset=multi_hop_subset)
        logger.info(f"Loaded multi-hop Q subset with {len(multi_hop_ds)} rows.")
    except Exception as e:
        logger.warning(f"Could not load multi-hop subset: {e}")
        multi_hop_ds = Dataset.from_dict({})

    try:
        cross_doc_ds = custom_load_dataset(config=config, subset=cross_doc_subset)
        logger.info(f"Loaded cross-document Q subset with {len(cross_doc_ds)} rows.")
    except Exception as e:
        logger.warning(f"Could not load cross-document subset: {e}")
        cross_doc_ds = Dataset.from_dict({})  # empty fallback

    try:
        chunked_ds = custom_load_dataset(config=config, subset=chunked_subset)
        logger.info(f"Loaded chunked subset with {len(chunked_ds)} rows.")
    except Exception as e:
        logger.error(f"Could not load chunked subset: {e}")
        logger.warning("Cannot proceed with chunk text or document text. They will be empty.")
        chunked_ds = Dataset.from_dict({})  # empty fallback

    try:
        summarized_ds = custom_load_dataset(config=config, subset=summarized_subset)
        logger.info(f"Loaded summarized subset with {len(summarized_ds)} rows.")
    except Exception as e:
        logger.error(f"Could not load summarized subset: {e}")
        summarized_ds = Dataset.from_dict({})

    if len(single_shot_ds) == 0 and len(multi_hop_ds) == 0 and len(cross_doc_ds) == 0:
        logger.error("No data in single-shot, multi-hop, or cross-document datasets. Exiting.")
        return

    # Prepare lookups from chunked dataset
    doc_meta_map = {}
    for row in chunked_ds:
        doc_id = row.get("document_id", "")
        doc_text = row.get("document_text", "")
        # Build a map from chunk_id to chunk_text for single-hop lookups
        chunk_dict = {chunk.get("chunk_id", ""): chunk.get("chunk_text", "") for chunk in row.get("chunks", [])}
        doc_meta_map[doc_id] = {"document_text": doc_text, "chunks_map": chunk_dict}

    for row in summarized_ds:
        doc_id = row.get("document_id", "")
        if doc_id in doc_meta_map:
            doc_meta_map[doc_id].update({"document_summary": row.get("document_summary")})

    # Helper functions to transform a row
    def make_single_shot_record(row: Dict[str, Any]) -> Dict[str, Any]:
        doc_id = row.get("document_id", "")
        chunk_id = row.get("chunk_id", "")

        # Grab doc meta
        doc_meta = doc_meta_map.get(doc_id, {})
        doc_text = doc_meta.get("document_text", "")
        doc_summary = doc_meta.get("document_summary", "")
        chunk_text = doc_meta.get("chunks_map", {}).get(chunk_id, "")

        # if multiple choice question convert to number
        gold = row.get("self_answer", "")
        if not gold:
            logger.warning("Row has empty answer line")

        stage_cfg_local = config.get("pipeline", {}).get("single_shot_question_generation", {})
        gold = (
            [ord(gold) - ord("A")]
            if stage_cfg_local.get("question_mode") == "multi-choice" and gold
            else [0]
            if stage_cfg_local.get("question_mode") == "multi-choice"
            else [gold]
        )

        return {
            "question": row.get("question", ""),
            "additional_instructions": row.get("additional_instructions", ""),
            "ground_truth_answer": row.get("self_answer", ""),
            "gold": gold,
            "choices": row.get("choices", []),
            "question_category": row.get("self_assessed_question_type", "unknown"),
            "kind": "single_shot",
            "estimated_difficulty": row.get("estimated_difficulty", 5),
            "citations": row.get("citations", []),
            "document_id": doc_id,
            "chunk_ids": [chunk_id] if chunk_id else [],
            "question_generating_model": row.get("generating_model", ""),
            "chunks": [chunk_text] if chunk_text else [],
            "document": doc_text,
            "document_summary": doc_summary,
        }

    def make_multi_hop_record(row: Dict[str, Any]) -> Dict[str, Any]:
        """
        Transform a multi-hop question row into a standardized dictionary
        for the final lighteval dataset.
        """
        doc_id: str = row.get("document_id", "")
        # e.g. row["source_chunk_ids"]: List[str]
        chunk_ids: List[str] = row.get("source_chunk_ids", [])
        doc_meta = doc_meta_map.get(doc_id, {})
        doc_text = doc_meta.get("document_text", "")
        doc_summary = doc_meta.get("document_summary", "")
        chunk_texts = [doc_meta.get("chunks_map", {}).get(cid, "") for cid in chunk_ids if cid]

        # if multiple choice question convert to number
        gold = row.get("self_answer", "")
        if not gold:
            logger.warning("Row has empty answer line")

        stage_cfg_local = config.get("pipeline", {}).get("multi_hop_question_generation", {})
        gold = (
            [ord(gold) - ord("A")]
            if stage_cfg_local.get("question_mode") == "multi-choice" and gold
            else [0]
            if stage_cfg_local.get("question_mode") == "multi-choice"
            else [gold]
        )

        return {
            "question": row.get("question", ""),
            "additional_instructions": row.get("additional_instructions", ""),
            "ground_truth_answer": row.get("self_answer", ""),
            "gold": gold,
            "choices": row.get("choices", []),
            "question_category": row.get("self_assessed_question_type", "unknown"),
            "kind": "multi_hop",
            "estimated_difficulty": row.get("estimated_difficulty", 5),
            "citations": row.get("citations", []),
            "document_id": doc_id,
            "chunk_ids": chunk_ids,
            "question_generating_model": row.get("generating_model", ""),
            "chunks": chunk_texts,
            "document": doc_text,
            "document_summary": doc_summary,
        }

    def make_cross_document_record(row: Dict[str, Any]) -> Dict[str, Any]:
        doc_id = row.get("document_id", "")
        chunk_ids = row.get("source_chunk_ids", [])
        doc_meta = doc_meta_map.get(doc_id, {})
        doc_text = doc_meta.get("document_text", "")
        doc_summary = doc_meta.get("document_summary", "")
        chunk_texts = [doc_meta.get("chunks_map", {}).get(cid, "") for cid in chunk_ids if cid]

        gold = row.get("self_answer", "")
        if not gold:
            logger.warning("Row has empty answer line")

        stage_cfg_local = config.get("pipeline", {}).get("multi_hop_question_generation", {})
        gold = (
            [ord(gold) - ord("A")]
            if stage_cfg_local.get("question_mode") == "multi-choice" and gold
            else [0]
            if stage_cfg_local.get("question_mode") == "multi-choice"
            else [gold]
        )

        return {
            "question": row.get("question", ""),
            "additional_instructions": row.get("additional_instructions", ""),
            "ground_truth_answer": row.get("self_answer", ""),
            "gold": gold,
            "choices": row.get("choices", []),
            "question_category": row.get("self_assessed_question_type", "unknown"),
            "kind": "cross_document",
            "estimated_difficulty": row.get("estimated_difficulty", 5),
            "citations": row.get("citations", []),
            "document_id": doc_id,
            "chunk_ids": chunk_ids,
            "question_generating_model": row.get("generating_model", ""),
            "chunks": chunk_texts,
            "document": doc_text,
            "document_summary": doc_summary,
        }

    # Final combination
    combined_records = (
        [make_single_shot_record(row) for row in single_shot_ds]
        + [make_multi_hop_record(row) for row in multi_hop_ds]
        + [make_cross_document_record(row) for row in cross_doc_ds]
    )

    if not combined_records:
        logger.warning("No final records to merge in lighteval. Exiting.")
        return

    # Create a Hugging Face Dataset
    logger.info(f"Assembling final dataset with {len(combined_records)} rows.")
    try:
        final_ds = Dataset.from_list(combined_records)
    except Exception as ds_error:
        logger.exception("Failed to create final dataset object")
        return

    # Save dataset
    custom_save_dataset(dataset=final_ds, config=config, subset=output_subset)
    logger.success("Lighteval dataset saved successfully.")