private def fixAmbiguousOutput()

in modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala [239:339]


    private def fixAmbiguousOutput(plan: LogicalPlan): LogicalPlan = plan.transformDown {
        case acc: SingleTableSQLAccumulator if acc.children.exists(_.isInstanceOf[JoinSQLAccumulator]) ⇒
            val fixedChildOutput =
                fixAmbiguousOutput(acc.tableExpression.get._1.outputExpressions, acc.igniteQueryContext)

            val newOutput = substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)

            acc.copy(
                outputExpressions = newOutput,
                where = acc.where.map(
                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                groupBy = acc.groupBy.map(
                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                having = acc.having.map(
                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                limit = acc.limit.map(
                    substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                localLimit = acc.localLimit.map(
                    substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                orderBy = acc.orderBy.map(
                    substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))

            acc

        case acc: JoinSQLAccumulator
            if acc.left.isInstanceOf[JoinSQLAccumulator] || acc.right.isInstanceOf[JoinSQLAccumulator] ⇒
            val leftFixed = acc.left match {
                case leftJoin: JoinSQLAccumulator ⇒
                    val fixedChildOutput = fixAmbiguousOutput(acc.left.outputExpressions, acc.igniteQueryContext)

                    val newOutput =
                        substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)

                    acc.copy(
                        outputExpressions = newOutput,
                        left = leftJoin.copy(outputExpressions = fixedChildOutput),
                        condition = acc.condition.map(
                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                        where = acc.where.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                        groupBy = acc.groupBy.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                        having = acc.having.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                        limit = acc.limit.map(
                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                        localLimit = acc.localLimit.map(
                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                        orderBy = acc.orderBy.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))

                case _ ⇒ acc
            }

            val fixed = leftFixed.right match {
                case rightJoin: JoinSQLAccumulator ⇒
                    val fixedChildOutput =
                        fixAmbiguousOutput(leftFixed.outputExpressions, leftFixed.igniteQueryContext)

                    val newOutput = substituteExpressions(leftFixed.outputExpressions, fixedChildOutput)

                    leftFixed.copy(
                        outputExpressions = newOutput,
                        right = rightJoin.copy(outputExpressions = fixedChildOutput),
                        condition = acc.condition.map(
                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                        where = acc.where.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                        groupBy = acc.groupBy.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                        having = acc.having.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
                        limit = acc.limit.map(
                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                        localLimit = acc.localLimit.map(
                            substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
                        orderBy = acc.orderBy.map(
                            substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))

                case _ ⇒ leftFixed
            }

            fixed.copy(
                condition = acc.condition.map(
                    substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
                where = acc.where.map(
                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
                groupBy = acc.groupBy.map(
                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
                having = acc.having.map(
                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
                limit = acc.limit.map(
                    substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
                localLimit = acc.localLimit.map(
                    substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
                orderBy = acc.orderBy.map(
                    substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)))

        case unknown ⇒
            unknown
    }