in databao/executors/react_duckdb/executor.py [0:0]
def register_db(self, source: DBDataSource) -> None:
"""Register DB in the DuckDB connection."""
connection = source.db_connection
if isinstance(connection, Connection):
connection = connection.engine
if isinstance(connection, duckdb.DuckDBPyConnection):
path = get_db_path(connection)
if path is not None:
connection.close()
self._duckdb_connection.execute(f"ATTACH '{path}' AS {source.name}")
else:
raise RuntimeError("Memory-based DuckDB is not supported.")
elif isinstance(connection, Engine):
register_sqlalchemy(self._duckdb_connection, connection, source.name)
else:
raise ValueError("Only DuckDB or SQLAlchemy connections are supported.")