in src/databao_context_engine/plugins/resources/parquet_introspector.py [0:0]
def introspect(self, file_config: ParquetConfigFile) -> ParquetIntrospectionResult:
with self._connect(file_config) as conn:
with conn.cursor() as cur:
resolved_url = _resolve_url(file_config)
cur.execute(f"SELECT * from parquet_metadata('{resolved_url}')")
cols = [desc[0].lower() for desc in cur.description] if cur.description else []
rows = cur.fetchall()
file_metas = [dict(zip(cols, row)) for row in rows]
columns_per_file: dict[str, dict[int, ParquetColumn]] = defaultdict(defaultdict)
for file_meta in file_metas:
file_name = file_meta["file_name"]
column_id = file_meta["column_id"]
column_name = file_meta["path_in_schema"]
column_type = file_meta.get("type") or ""
num_values: int = file_meta["num_values"]
stats_min = file_meta.get("stats_min") or ""
stats_max = file_meta.get("stats_max") or ""
stats_null_count: int | None = file_meta.get("stats_null_count")
stats_distinct_count: int | None = file_meta.get("stats_distinct_count")
columns: dict[int, ParquetColumn] = columns_per_file[file_name]
column: ParquetColumn | None = columns.get(column_id)
if column:
columns[column_id] = replace(
column, num_values=column.num_values + num_values, row_groups=column.row_groups + 1
)
else:
columns[column_id] = ParquetColumn(
name=column_name,
type=column_type,
row_groups=1,
num_values=num_values,
stats_min=stats_min,
stats_max=stats_max,
stats_null_count=stats_null_count,
stats_distinct_count=stats_distinct_count,
)
return ParquetIntrospectionResult(
files=[
ParquetFile(file_name, columns=list(columns.values()))
for (file_name, columns) in columns_per_file.items()
]
)