in odps/df/backends/optimize/columnpruning.py [0:0]
def _need_prune(self, expr):
parents = self._dag.successors(expr)
if len(parents) == 0:
return False, set()
columns = set()
for p in parents:
if isinstance(p, Column) and p.input is expr:
columns.add(p.source_name)
elif isinstance(p, JoinCollectionExpr):
proxy = ExprProxy(p)
if proxy not in self._remain_columns:
continue
lcols, rcols = self._remain_columns[proxy]
if p._get_child(p._lhs) is expr:
columns = columns.union(lcols)
elif p._get_child(p._rhs) is expr:
columns = columns.union(rcols)
else:
continue
elif isinstance(p, UnionCollectionExpr):
proxy = ExprProxy(p)
if proxy not in self._remain_columns:
continue
cols = self._remain_columns[proxy]
if p._lhs is expr or p._rhs is expr:
columns = columns.union(cols)
elif isinstance(p, LateralViewCollectionExpr):
proxy = ExprProxy(p)
if proxy not in self._remain_columns:
continue
col_lists = self._remain_columns[proxy]
sources = [p.input]
sources.extend(p.lateral_views)
found = False
for clist, coll in zip(col_lists, sources):
if coll is expr:
columns = columns.union(clist)
found = True
break
if not found:
continue
elif hasattr(p, '_collection_resources') and p._collection_resources is not None:
collection_resources = set([ExprProxy(r) for r in p._collection_resources])
if ExprProxy(expr) in collection_resources:
# the expr is included in apply collection_resources, cannot do any prune
return False, set()
else:
proxy = ExprProxy(p)
if proxy not in self._remain_columns:
continue
if hasattr(p, 'input') and p.input is expr:
columns = columns.union(self._remain_columns[proxy])
if len(columns) == 0:
return False, set()
if len(set(expr.schema.names) - columns) > 0:
order = dict((c, i) for i, c in enumerate(expr.schema.names))
return True, sorted(columns, key=order.get)
return False, sorted(columns)