odps/df/backends/sqlalchemy/rewriter.py (46 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import itertools import operator from ..rewriter import BaseRewriter from ...expr.merge import * from ...utils import traverse_until_source class Rewriter(BaseRewriter): def visit_project_collection(self, expr): self._rewrite_reduction_in_projection(expr) def visit_filter_collection(self, expr): self._rewrite_reduction_in_filter(expr) def visit_column(self, expr): if isinstance(expr.input, JoinCollectionExpr): input = expr.input name = expr.source_name while isinstance(input, JoinCollectionExpr): idx, name = input._column_origins[name] input = (input.lhs, input.rhs)[idx] self._sub(expr, input[name].rename(expr.name)) def visit_join(self, expr): if expr._predicate and isinstance(expr._predicate, list): expr._predicate = reduce(operator.and_, expr._predicate) for node in (expr.rhs,): parents = self._parents(node) if isinstance(node, JoinCollectionExpr): projection = JoinProjectCollectionExpr( _input=node, _schema=node.schema, _fields=node._fetch_fields()) self._sub(node, projection, parents) elif isinstance(node, JoinProjectCollectionExpr): self._sub(node.input, node, parents) need_project = [False, ] def walk(node): if isinstance(node, JoinCollectionExpr) and \ node.column_conflict: need_project[0] = True return if isinstance(node, JoinCollectionExpr): walk(node.lhs) walk(expr) if need_project[0]: parents = self._parents(expr) if not parents or \ not any(isinstance(parent, (ProjectCollectionExpr, JoinCollectionExpr)) for parent in parents): to_sub = expr[expr] self._sub(expr, to_sub, parents)