in yourbench/pipeline/upload_ingest_to_hub.py [0:0]
def run(config: dict[str, Any]) -> None:
"""
Primary function to execute the 'upload_ingest_to_hub' stage.
This function aggregates markdown documents from a given source directory
(configured in `pipeline.upload_ingest_to_hub.source_documents_dir`) into a
Hugging Face Dataset, which is then saved locally or pushed to the Hub.
Args:
config (dict[str, Any]):
The overall pipeline configuration dictionary. Relevant keys:
- config["pipeline"]["upload_ingest_to_hub"]["run"] (bool):
Whether to run this stage.
- config["pipeline"]["upload_ingest_to_hub"]["source_documents_dir"] (str):
Directory path for the ingested markdown files.
- config["hf_configuration"]["token"] (str, optional):
Hugging Face token for authentication if uploading a private dataset.
- config["hf_configuration"]["private"] (bool):
Whether to keep the dataset private on the Hub (defaults to True).
- config["hf_configuration"]["global_dataset_name"] (str):
Base dataset name on Hugging Face (can be overridden).
- config["pipeline"]["upload_ingest_to_hub"]["output_dataset_name"] (str, optional):
The name of the dataset to save to/push to on the Hugging Face Hub.
- config["pipeline"]["upload_ingest_to_hub"]["output_subset"] (str, optional):
Subset name for partial saving (default is this stage name).
Raises:
ValueError:
If `source_documents_dir` is missing in the config, indicating incomplete config.
"""
stage_name = "upload_ingest_to_hub"
stage_cfg = config.get("pipeline", {}).get(stage_name, {})
# Check if this stage is turned off in config
if not stage_cfg.get("run", False):
logger.info(f"Stage '{stage_name}' is disabled. Skipping.")
return
source_dir: Optional[str] = stage_cfg.get("source_documents_dir")
# If source_dir is not provided, try to get it from the ingestion stage output
if not source_dir:
logger.info(
f"'source_documents_dir' not specified for '{stage_name}'. "
f"Attempting to use 'output_dir' from the 'ingestion' stage."
)
ingestion_cfg = config.get("pipeline", {}).get("ingestion", {})
print(ingestion_cfg)
source_dir = ingestion_cfg.get("output_dir")
if not source_dir:
error_msg = (
f"Missing required directory configuration. Please specify either "
f"'source_documents_dir' in pipeline.{stage_name} or "
f"'output_dir' in pipeline.ingestion."
)
logger.error(error_msg)
raise ValueError(error_msg)
logger.info(f"Using source directory: {source_dir}")
# Collect .md files
md_file_paths = glob.glob(os.path.join(source_dir, "*.md"))
if not md_file_paths:
raise FileNotFoundError(f"No .md files found in '{source_dir}'.")
# Read them into Python objects
ingested_documents = _collect_markdown_files(md_file_paths)
if not ingested_documents:
raise FileNotFoundError(f"No valid markdown documents parsed in '{source_dir}'.")
# Convert the ingested markdown docs to a Hugging Face Dataset
dataset = _convert_ingested_docs_to_dataset(ingested_documents)
# Save or push the dataset to the configured location
custom_save_dataset(dataset=dataset, config=config, subset="ingested")
logger.success(f"Successfully completed '{stage_name}' stage.")