src/databao_context_engine/build_sources/internal/build_runner.py (57 lines of code) (raw):

import logging from pathlib import Path from databao_context_engine.build_sources.internal.build_service import BuildService from databao_context_engine.build_sources.internal.export_results import ( append_result_to_all_results, create_run_dir, export_build_result, ) from databao_context_engine.plugins.plugin_loader import load_plugins from databao_context_engine.project.datasource_discovery import discover_datasources, prepare_source logger = logging.getLogger(__name__) def build( project_dir: Path, *, build_service: BuildService, project_id: str, dce_version: str, ) -> None: """ Build entrypoint. 1) Load available plugins 2) Discover sources 3) Create a run 4) For each source, call process_source """ plugins = load_plugins() datasources = discover_datasources(project_dir) if not datasources: logger.info("No sources discovered under %s", project_dir) return run = None run_dir = None number_processed_datasources = 0 for discovered_datasource in datasources: try: prepared_source = prepare_source(discovered_datasource) logger.info( f'Found datasource of type "{prepared_source.datasource_type.full_type}" with name {prepared_source.path.stem}' ) plugin = plugins.get(prepared_source.datasource_type) if plugin is None: logger.warning( "No plugin for '%s' (datasource=%s) — skipping.", prepared_source.datasource_type.full_type, prepared_source.path, ) continue if run is None or run_dir is None: # Initialiase the run as soon as we found a source run = build_service.start_run(project_id=project_id, dce_version=dce_version) run_dir = create_run_dir(project_dir, run.run_name) result = build_service.process_prepared_source( run_id=run.run_id, prepared_source=prepared_source, plugin=plugin, ) export_build_result(run_dir, result) append_result_to_all_results(run_dir, result) number_processed_datasources += 1 except Exception as e: logger.debug(str(e), exc_info=True, stack_info=True) logger.info(f"Failed to build source at ({discovered_datasource.path}): {str(e)}") if run is not None: build_service.finalize_run(run_id=run.run_id) logger.info("Build complete. Processed %d datasources.", number_processed_datasources)