in src/databao_context_engine/plugins/databases/snowflake_introspector.py [0:0]
def collect_schema_model(self, connection, catalog: str, schema: str) -> list[DatabaseTable] | None:
comps = self._component_queries(catalog, schema)
statements = [c["sql"].rstrip().rstrip(";") for c in comps]
batch_sql = ";\n".join(statements)
results: dict[str, list[dict]] = {c["name"]: [] for c in comps if c["name"]}
with connection.cursor(DictCursor) as cur:
cur.execute(batch_sql, num_statements=len(statements))
for ix, comp in enumerate(comps, start=1):
name = comp["name"]
rows = self._lower_keys(cur.fetchall()) if cur.description else []
if name:
results[name] = rows
if ix < len(comps):
ok = cur.nextset()
if not ok:
raise RuntimeError(
f"Snowflake multi-statement batch ended early after component #{ix} '{name}'"
)
return TableBuilder.build_from_components(
rels=results.get("relations", []),
cols=results.get("columns", []),
pk_cols=results.get("pk", []),
uq_cols=results.get("uq", []),
checks=[],
fk_cols=results.get("fks", []),
idx_cols=[],
)