private def pushDownOperators()

in modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala [55:231]


    private def pushDownOperators(plan: LogicalPlan): LogicalPlan = {
        val aliasIndexIterator = Stream.from(1).iterator

        //Flag to indicate that some step was skipped due to unsupported expression.
        //When it true we has to skip entire transformation of higher level Nodes.
        var stepSkipped = true

        //Applying optimization rules from bottom to up tree nodes.
        plan.transformUp {
            //We found basic node to transform.
            //We create new accumulator and going to the upper layers.
            case LogicalRelation(igniteSqlRelation: IgniteSQLRelation[_, _], output, _catalogTable, _) ⇒
                //Clear flag to optimize each statement separately
                stepSkipped = false

                val igniteQueryContext = IgniteQueryContext(
                    igniteContext = igniteSqlRelation.ic,
                    sqlContext = igniteSqlRelation.sqlContext,
                    catalogTable = _catalogTable,
                    aliasIndex = aliasIndexIterator,
                    cacheName =
                        sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName,
                            igniteSqlRelation.schemaName)
                            .getOrElse(throw new IgniteException("Unknown table")))

                //Logical Relation is bottomest TreeNode in LogicalPlan.
                //We replace it with accumulator.
                //We push all supported SQL operator into it on the higher tree levels.
                SingleTableSQLAccumulator(
                    igniteQueryContext = igniteQueryContext,
                    table = Some(igniteSqlRelation.tableName),
                    tableExpression = None,
                    outputExpressions = output.map(attr ⇒ attr.withQualifier(Seq(igniteSqlRelation.tableName))))

            case project: Project if !stepSkipped && exprsAllowed(project.projectList) ⇒
                //Project layer just changes output of current query.
                project.child match {
                    case acc: SelectAccumulator ⇒
                        acc.withOutputExpressions(
                            substituteExpressions(project.projectList, acc.outputExpressions))

                    case _ ⇒
                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
                }

            case sort: Sort if !stepSkipped && isSortPushDownAllowed(sort.order, sort.global) ⇒
                sort.child match {
                    case acc: QueryAccumulator ⇒
                        acc.withOrderBy(sort.order)

                    case _ ⇒
                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
                }

            case filter: Filter if !stepSkipped && exprsAllowed(filter.condition) ⇒

                filter.child match {
                    case acc: SelectAccumulator ⇒
                        if (hasAggregateInside(filter.condition) || acc.groupBy.isDefined)
                            acc.withHaving(acc.having.getOrElse(Nil) :+ filter.condition)
                        else
                            acc.withWhere(acc.where.getOrElse(Nil) :+ filter.condition)

                    case _ ⇒
                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
                }

            case agg: Aggregate
                if !stepSkipped && exprsAllowed(agg.groupingExpressions) && exprsAllowed(agg.aggregateExpressions) ⇒

                agg.child match {
                    case acc: SelectAccumulator ⇒
                        if (acc.groupBy.isDefined) {
                            val tableAlias = acc.igniteQueryContext.uniqueTableAlias

                            SingleTableSQLAccumulator(
                                igniteQueryContext = acc.igniteQueryContext,
                                table = None,
                                tableExpression = Some((acc, tableAlias)),
                                outputExpressions = agg.aggregateExpressions)
                        }
                        else
                            acc
                                .withGroupBy(agg.groupingExpressions)
                                .withOutputExpressions(
                                    substituteExpressions(agg.aggregateExpressions, acc.outputExpressions))

                    case acc: QueryAccumulator ⇒
                        val tableAlias = acc.igniteQueryContext.uniqueTableAlias

                        SingleTableSQLAccumulator(
                            igniteQueryContext = acc.igniteQueryContext,
                            table = None,
                            tableExpression = Some((acc, tableAlias)),
                            outputExpressions = agg.aggregateExpressions)

                    case _ ⇒
                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
                }

            case limit: LocalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
                limit.child match {
                    case acc: SelectAccumulator ⇒
                        acc.withLocalLimit(limit.limitExpr)

                    case acc: QueryAccumulator ⇒
                        acc.withLocalLimit(limit.limitExpr)

                    case _ ⇒
                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
                }

            case limit: GlobalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
                limit.child.transformUp {
                    case acc: SelectAccumulator ⇒
                        acc.withLimit(limit.limitExpr)

                    case acc: QueryAccumulator ⇒
                        acc.withLimit(limit.limitExpr)

                    case _ ⇒
                        throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
                }

            case union: Union if !stepSkipped && isAllChildrenOptimized(union.children) ⇒
                val first = union.children.head.asInstanceOf[QueryAccumulator]

                val subQueries = union.children.map(_.asInstanceOf[QueryAccumulator])

                UnionSQLAccumulator(
                    first.igniteQueryContext,
                    subQueries,
                    subQueries.head.output)

            case join: Join
                if !stepSkipped && isAllChildrenOptimized(Seq(join.left, join.right)) &&
                    join.condition.forall(exprsAllowed) ⇒

                val left = join.left.asInstanceOf[QueryAccumulator]

                val (leftOutput, leftAlias) =
                    if (!isSimpleTableAcc(left)) {
                        val tableAlias = left.igniteQueryContext.uniqueTableAlias

                        (left.output, Some(tableAlias))
                    }
                    else
                        (left.output, None)

                val right = join.right.asInstanceOf[QueryAccumulator]

                val (rightOutput, rightAlias) =
                    if (!isSimpleTableAcc(right) ||
                        leftAlias.getOrElse(left.qualifier) == right.qualifier) {
                        val tableAlias = right.igniteQueryContext.uniqueTableAlias

                        (right.output, Some(tableAlias))
                    }
                    else
                        (right.output, None)

                JoinSQLAccumulator(
                    left.igniteQueryContext,
                    left,
                    right,
                    join.joinType,
                    leftOutput ++ rightOutput,
                    join.condition,
                    leftAlias,
                    rightAlias)

            case unknown ⇒
                stepSkipped = true

                unknown
        }
    }