in src/dma/collector/workflows/collection_extractor/base.py [0:0]
def collect_db_specific_data(self, execution_id: str) -> None:
dbs = self.get_all_dbs()
for db in dbs:
async_engine = get_engine(src_info=self.src_info, database=db)
with Session(async_engine) as db_session:
collection_manager = next(
provide_collection_query_manager(
db_session=db_session, execution_id=execution_id, manual_id=self.collection_identifier
)
)
db_collection = collection_manager.execute_per_db_collection_queries()
self.import_to_table(db_collection)
async_engine.dispose()