in src/databao_context_engine/plugins/databases/mysql_introspector.py [0:0]
def collect_schema_model(self, connection, catalog: str, schema: str) -> list[DatabaseTable] | None:
comps = self._component_queries()
results: dict[str, list[dict]] = {name: [] for name in comps}
batch = ";\n".join(
sql.replace("{SCHEMA}", self._quote_literal(schema)).rstrip().rstrip(";") for sql in comps.values()
)
with connection.cursor(pymysql.cursors.DictCursor) as cur:
cur.execute(batch)
for ix, name in enumerate(comps.keys(), start=1):
raw_rows = cur.fetchall() if cur.description else ()
rows_list: list[dict]
# TODO: simplify this
if raw_rows and isinstance(raw_rows[0], dict):
rows_list = [{k.lower(): v for k, v in row.items()} for row in raw_rows]
else:
if cur.description:
cols = [d[0].lower() for d in cur.description]
rows_list = [dict(zip(cols, r)) for r in raw_rows]
else:
rows_list = []
results[name] = rows_list
if ix < len(comps):
ok = cur.nextset()
if not ok:
raise RuntimeError(f"MySQL batch ended early after component #{ix} '{name}'")
return TableBuilder.build_from_components(
rels=results.get("relations", []),
cols=results.get("columns", []),
pk_cols=results.get("pk", []),
uq_cols=results.get("uq", []),
checks=results.get("checks", []),
fk_cols=results.get("fks", []),
idx_cols=results.get("idx", []),
)