def run()

in yourbench/pipeline/ingestion.py [0:0]


def run(config: dict[str, Any]) -> None:
    """
    Execute the ingestion stage of the pipeline.

    This function checks whether the ingestion stage is enabled in the pipeline
    configuration. If enabled, it performs the following actions:

    1. Reads all files from the directory specified by `config["pipeline"]["ingestion"]["source_documents_dir"]`.
    2. Converts each file to Markdown using the MarkItDown library.
       Optionally, an LLM can be leveraged for advanced conversions (e.g., image descriptions).
    3. Saves the resulting .md outputs to the directory specified by `config["pipeline"]["ingestion"]["output_dir"]`.

    Args:
        config (dict[str, Any]): A configuration dictionary with keys:
            - config["pipeline"]["ingestion"]["run"] (bool): Whether to run ingestion.
            - config["pipeline"]["ingestion"]["source_documents_dir"] (str): Directory containing source documents.
            - config["pipeline"]["ingestion"]["output_dir"] (str): Directory where .md files will be saved.
            - config["model_roles"]["ingestion"] (Optional[list[str]]): Model names for LLM ingestion support.
            - config["model_list"] (Optional[list[dict[str, str]]]): Detailed LLM model configs.

    Returns:
        None

    Logs:
        Writes detailed logs to logs/ingestion.log describing each step taken
        and any errors encountered during file reading or conversion.
    """
    # Extract typed configurations from the dictionary
    ingestion_config = _extract_ingestion_config(config)

    # Check if ingestion is enabled
    if not ingestion_config.run:
        logger.info("Ingestion stage is disabled. No action will be taken.")
        return

    # Check required directories
    if not ingestion_config.source_documents_dir or not ingestion_config.output_dir:
        logger.error("Missing 'source_documents_dir' or 'output_dir' in ingestion config. Cannot proceed.")
        return

    # Ensure the output directory exists
    os.makedirs(ingestion_config.output_dir, exist_ok=True)
    logger.debug("Prepared output directory: {}", ingestion_config.output_dir)

    # Initialize MarkItDown processor (may include LLM if configured)
    markdown_processor = _initialize_markdown_processor(config)

    # Gather all files in the source directory (recursively if desired)
    all_source_files = glob.glob(os.path.join(ingestion_config.source_documents_dir, "**"), recursive=True)
    if not all_source_files:
        logger.warning(
            "No files found in source directory: {}",
            ingestion_config.source_documents_dir,
        )
        return

    logger.info(
        "Ingestion stage: Converting files from '{}' to '{}'...",
        ingestion_config.source_documents_dir,
        ingestion_config.output_dir,
    )

    # Process each file in the source directory
    for file_path in all_source_files:
        if os.path.isfile(file_path):
            _convert_document_to_markdown(
                file_path=file_path,
                output_dir=ingestion_config.output_dir,
                markdown_processor=markdown_processor,
            )

    logger.success(
        "Ingestion stage complete: Processed files from '{}' and saved Markdown to '{}'.",
        ingestion_config.source_documents_dir,
        ingestion_config.output_dir,
    )