def run()

in yourbench/pipeline/upload_ingest_to_hub.py [0:0]


def run(config: dict[str, Any]) -> None:
    """
    Primary function to execute the 'upload_ingest_to_hub' stage.

    This function aggregates markdown documents from a given source directory
    (configured in `pipeline.upload_ingest_to_hub.source_documents_dir`) into a
    Hugging Face Dataset, which is then saved locally or pushed to the Hub.

    Args:
        config (dict[str, Any]):
            The overall pipeline configuration dictionary. Relevant keys:

            - config["pipeline"]["upload_ingest_to_hub"]["run"] (bool):
                Whether to run this stage.
            - config["pipeline"]["upload_ingest_to_hub"]["source_documents_dir"] (str):
                Directory path for the ingested markdown files.
            - config["hf_configuration"]["token"] (str, optional):
                Hugging Face token for authentication if uploading a private dataset.
            - config["hf_configuration"]["private"] (bool):
                Whether to keep the dataset private on the Hub (defaults to True).
            - config["hf_configuration"]["global_dataset_name"] (str):
                Base dataset name on Hugging Face (can be overridden).
            - config["pipeline"]["upload_ingest_to_hub"]["output_dataset_name"] (str, optional):
                The name of the dataset to save to/push to on the Hugging Face Hub.
            - config["pipeline"]["upload_ingest_to_hub"]["output_subset"] (str, optional):
                Subset name for partial saving (default is this stage name).

    Raises:
        ValueError:
            If `source_documents_dir` is missing in the config, indicating incomplete config.
    """
    stage_name = "upload_ingest_to_hub"
    stage_cfg = config.get("pipeline", {}).get(stage_name, {})

    # Check if this stage is turned off in config
    if not stage_cfg.get("run", False):
        logger.info(f"Stage '{stage_name}' is disabled. Skipping.")
        return

    source_dir: Optional[str] = stage_cfg.get("source_documents_dir")

    # If source_dir is not provided, try to get it from the ingestion stage output
    if not source_dir:
        logger.info(
            f"'source_documents_dir' not specified for '{stage_name}'. "
            f"Attempting to use 'output_dir' from the 'ingestion' stage."
        )
        ingestion_cfg = config.get("pipeline", {}).get("ingestion", {})
        print(ingestion_cfg)
        source_dir = ingestion_cfg.get("output_dir")

    if not source_dir:
        error_msg = (
            f"Missing required directory configuration. Please specify either "
            f"'source_documents_dir' in pipeline.{stage_name} or "
            f"'output_dir' in pipeline.ingestion."
        )
        logger.error(error_msg)
        raise ValueError(error_msg)

    logger.info(f"Using source directory: {source_dir}")

    # Collect .md files
    md_file_paths = glob.glob(os.path.join(source_dir, "*.md"))
    if not md_file_paths:
        raise FileNotFoundError(f"No .md files found in '{source_dir}'.")

    # Read them into Python objects
    ingested_documents = _collect_markdown_files(md_file_paths)
    if not ingested_documents:
        raise FileNotFoundError(f"No valid markdown documents parsed in '{source_dir}'.")

    # Convert the ingested markdown docs to a Hugging Face Dataset
    dataset = _convert_ingested_docs_to_dataset(ingested_documents)

    # Save or push the dataset to the configured location
    custom_save_dataset(dataset=dataset, config=config, subset="ingested")
    logger.success(f"Successfully completed '{stage_name}' stage.")