in src/databao_context_engine/build_sources/internal/plugin_execution.py [0:0]
def _execute(prepared_datasource: PreparedDatasource, plugin: BuildPlugin) -> Any:
"""
Run a prepared source through the plugin
"""
if isinstance(prepared_datasource, PreparedConfig):
ds_plugin = cast(BuildDatasourcePlugin, plugin)
return execute_datasource_plugin(
plugin=ds_plugin,
datasource_type=prepared_datasource.datasource_type,
config=prepared_datasource.config,
datasource_name=prepared_datasource.datasource_name,
)
else:
file_plugin = cast(BuildFilePlugin, plugin)
return execute_file_plugin(
plugin=file_plugin,
datasource_type=prepared_datasource.datasource_type,
file_path=prepared_datasource.path,
)