in odps/df/backends/optimize/predicatepushdown.py [0:0]
def visit_filter_collection(self, expr):
from .core import Optimizer
predicates = self._split_filter_into_predicates(expr)
if any(isinstance(seq, (Window, SequenceReduction, Func))
for seq in itertools.chain(*expr.all_path(expr.input, strict=True))):
# if any reduction like sum or window op exists, skip
return
if any(isinstance(p, IsIn) and
(p._values is None or
(len(p._values) == 1 and isinstance(p._values[0], SequenceExpr)))
for p in predicates):
return
filters = [expr]
input = expr.input
while isinstance(input, FilterCollectionExpr):
filters.append(input)
input = input.input
def sub():
if len(filters) > 1:
new_expr = Optimizer.get_compact_filters(self._dag, *filters)
old_predicate = expr._predicate
expr._predicate = new_expr.predicate
self._dag.substitute(old_predicate, expr._predicate, parents=[expr])
[self._traversed.add(id(i)) for i in filters[1:]]
return True
if isinstance(input, ProjectCollectionExpr):
if any(isinstance(seq, (Window, SequenceReduction, MappedExpr))
for seq in input.traverse(top_down=True, unique=True,
stop_cond=lambda x: x is input.input)):
# if any reduction like sum or window op exists, skip
return
if sub():
predicates = self._split_filter_into_predicates(expr)
predicate = expr.predicate
remain = None
if isinstance(input, LateralViewCollectionExpr):
udtf_columns = set()
for lv in input.lateral_views:
udtf_columns.update(lv.schema.names)
pushes = []
remains = []
for p in predicates:
cols = [col.source_name for path in p.all_path(input, strict=True)
for col in path if isinstance(col, Column)]
if not (set(cols) & udtf_columns):
pushes.append(p)
else:
remains.append(p)
if not pushes:
return
predicate = reduce(operator.and_, pushes)
if remains:
remain = reduce(operator.and_, remains)
self._push_down_through_projection(predicate, expr, input)
if remain is None:
self._dag.substitute(expr, expr.input)
else:
expr.substitute(expr._predicate, remain, dag=self._dag)
elif isinstance(input, InnerJoin):
if sub():
predicates = self._split_filter_into_predicates(expr)
remains = []
predicates = predicates[::-1]
for i, predicate in enumerate(predicates):
collection = self._predicate_on_same_collection(
predicate, expr.input, input)
if collection is not False:
self._push_down_through_join(predicate, expr, input, collection)
else:
remains.append(i)
if len(remains) == 0:
self._dag.substitute(expr, expr.input)
else:
expr.substitute(expr._predicate,
reduce(operator.and_, [predicates[i] for i in remains]),
dag=self._dag)
elif isinstance(input, UnionCollectionExpr):
sub()
self._push_down_through_union(expr.predicate, expr, input)
self._dag.substitute(expr, expr.input)