in src/databao_context_engine/plugins/databases/base_introspector.py [0:0]
def introspect_database(self, file_config: T) -> DatabaseIntrospectionResult:
matcher = IntrospectionScopeMatcher(
file_config.introspection_scope,
ignored_schemas=self._ignored_schemas(),
)
with self._connect(file_config) as root_connection:
catalogs = self._get_catalogs_adapted(root_connection, file_config)
introspected_catalogs: list[DatabaseCatalog] = []
for catalog in catalogs:
with self._connect_to_catalog(file_config, catalog) as catalog_connection:
discovered_schemas = self._list_schemas_for_catalog(catalog_connection, catalog)
selection = matcher.filter_scopes([catalog], {catalog: discovered_schemas})
selected_schemas = selection.schemas_per_catalog.get(catalog, [])
tables_by_schema: list[DatabaseSchema] = []
for schema in selected_schemas:
tables = self.collect_schema_model(catalog_connection, catalog, schema) or []
if tables:
for table in tables:
table.samples = self._collect_samples_for_table(
catalog_connection, catalog, schema, table.name
)
tables_by_schema.append(DatabaseSchema(name=schema, tables=tables))
if tables_by_schema:
introspected_catalogs.append(DatabaseCatalog(name=catalog, schemas=tables_by_schema))
return DatabaseIntrospectionResult(catalogs=introspected_catalogs)