in yourbench/pipeline/ingestion.py [0:0]
def run(config: dict[str, Any]) -> None:
"""
Execute the ingestion stage of the pipeline.
This function checks whether the ingestion stage is enabled in the pipeline
configuration. If enabled, it performs the following actions:
1. Reads all files from the directory specified by `config["pipeline"]["ingestion"]["source_documents_dir"]`.
2. Converts each file to Markdown using the MarkItDown library.
Optionally, an LLM can be leveraged for advanced conversions (e.g., image descriptions).
3. Saves the resulting .md outputs to the directory specified by `config["pipeline"]["ingestion"]["output_dir"]`.
Args:
config (dict[str, Any]): A configuration dictionary with keys:
- config["pipeline"]["ingestion"]["run"] (bool): Whether to run ingestion.
- config["pipeline"]["ingestion"]["source_documents_dir"] (str): Directory containing source documents.
- config["pipeline"]["ingestion"]["output_dir"] (str): Directory where .md files will be saved.
- config["model_roles"]["ingestion"] (Optional[list[str]]): Model names for LLM ingestion support.
- config["model_list"] (Optional[list[dict[str, str]]]): Detailed LLM model configs.
Returns:
None
Logs:
Writes detailed logs to logs/ingestion.log describing each step taken
and any errors encountered during file reading or conversion.
"""
# Extract typed configurations from the dictionary
ingestion_config = _extract_ingestion_config(config)
# Check if ingestion is enabled
if not ingestion_config.run:
logger.info("Ingestion stage is disabled. No action will be taken.")
return
# Check required directories
if not ingestion_config.source_documents_dir or not ingestion_config.output_dir:
logger.error("Missing 'source_documents_dir' or 'output_dir' in ingestion config. Cannot proceed.")
return
# Ensure the output directory exists
os.makedirs(ingestion_config.output_dir, exist_ok=True)
logger.debug("Prepared output directory: {}", ingestion_config.output_dir)
# Initialize MarkItDown processor (may include LLM if configured)
markdown_processor = _initialize_markdown_processor(config)
# Gather all files in the source directory (recursively if desired)
all_source_files = glob.glob(os.path.join(ingestion_config.source_documents_dir, "**"), recursive=True)
if not all_source_files:
logger.warning(
"No files found in source directory: {}",
ingestion_config.source_documents_dir,
)
return
logger.info(
"Ingestion stage: Converting files from '{}' to '{}'...",
ingestion_config.source_documents_dir,
ingestion_config.output_dir,
)
# Process each file in the source directory
for file_path in all_source_files:
if os.path.isfile(file_path):
_convert_document_to_markdown(
file_path=file_path,
output_dir=ingestion_config.output_dir,
markdown_processor=markdown_processor,
)
logger.success(
"Ingestion stage complete: Processed files from '{}' and saved Markdown to '{}'.",
ingestion_config.source_documents_dir,
ingestion_config.output_dir,
)