def visit_filter_collection()

in odps/df/backends/optimize/predicatepushdown.py [0:0]


    def visit_filter_collection(self, expr):
        from .core import Optimizer

        predicates = self._split_filter_into_predicates(expr)

        if any(isinstance(seq, (Window, SequenceReduction, Func))
               for seq in itertools.chain(*expr.all_path(expr.input, strict=True))):
            # if any reduction like sum or window op exists, skip
            return
        if any(isinstance(p, IsIn) and
                (p._values is None or
                 (len(p._values) == 1 and isinstance(p._values[0], SequenceExpr)))
               for p in predicates):
            return

        filters = [expr]
        input = expr.input
        while isinstance(input, FilterCollectionExpr):
            filters.append(input)
            input = input.input

        def sub():
            if len(filters) > 1:
                new_expr = Optimizer.get_compact_filters(self._dag, *filters)
                old_predicate = expr._predicate
                expr._predicate = new_expr.predicate
                self._dag.substitute(old_predicate, expr._predicate, parents=[expr])
                [self._traversed.add(id(i)) for i in filters[1:]]
                return True
        if isinstance(input, ProjectCollectionExpr):
            if any(isinstance(seq, (Window, SequenceReduction, MappedExpr))
                   for seq in input.traverse(top_down=True, unique=True,
                                             stop_cond=lambda x: x is input.input)):
                # if any reduction like sum or window op exists, skip
                return
            if sub():
                predicates = self._split_filter_into_predicates(expr)
            predicate = expr.predicate
            remain = None
            if isinstance(input, LateralViewCollectionExpr):
                udtf_columns = set()
                for lv in input.lateral_views:
                    udtf_columns.update(lv.schema.names)
                pushes = []
                remains = []
                for p in predicates:
                    cols = [col.source_name for path in p.all_path(input, strict=True)
                            for col in path if isinstance(col, Column)]
                    if not (set(cols) & udtf_columns):
                        pushes.append(p)
                    else:
                        remains.append(p)
                if not pushes:
                    return
                predicate = reduce(operator.and_, pushes)
                if remains:
                    remain = reduce(operator.and_, remains)
            self._push_down_through_projection(predicate, expr, input)
            if remain is None:
                self._dag.substitute(expr, expr.input)
            else:
                expr.substitute(expr._predicate, remain, dag=self._dag)
        elif isinstance(input, InnerJoin):
            if sub():
                predicates = self._split_filter_into_predicates(expr)
            remains = []
            predicates = predicates[::-1]
            for i, predicate in enumerate(predicates):
                collection = self._predicate_on_same_collection(
                    predicate, expr.input, input)
                if collection is not False:
                    self._push_down_through_join(predicate, expr, input, collection)
                else:
                    remains.append(i)
            if len(remains) == 0:
                self._dag.substitute(expr, expr.input)
            else:
                expr.substitute(expr._predicate,
                                reduce(operator.and_, [predicates[i] for i in remains]),
                                dag=self._dag)
        elif isinstance(input, UnionCollectionExpr):
            sub()
            self._push_down_through_union(expr.predicate, expr, input)
            self._dag.substitute(expr, expr.input)