in datafusion/polars.py [0:0]
def to_polars_df(self, plan):
# recurse down first to translate inputs into Polars data frames
inputs = [self.to_polars_df(x) for x in plan.inputs()]
# get Python wrapper for logical operator node
node = plan.to_variant()
if isinstance(node, Projection):
args = [self.to_polars_expr(expr) for expr in node.projections()]
return inputs[0].select(*args)
elif isinstance(node, Aggregate):
groupby_expr = [
self.to_polars_expr(expr) for expr in node.group_by_exprs()
]
aggs = []
for expr in node.aggregate_exprs():
expr = expr.to_variant()
if isinstance(expr, AggregateFunction):
if expr.aggregate_type() == "COUNT":
aggs.append(polars.count().alias("{}".format(expr)))
else:
raise Exception(
"Unsupported aggregate function {}".format(
expr.aggregate_type()
)
)
else:
raise Exception(
"Unsupported aggregate function {}".format(expr)
)
df = inputs[0].groupby(groupby_expr).agg(aggs)
return df
elif isinstance(node, TableScan):
return polars.read_parquet(self.parquet_tables[node.table_name()])
else:
raise Exception(
"unsupported logical operator: {}".format(type(node))
)