in modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala [239:339]
private def fixAmbiguousOutput(plan: LogicalPlan): LogicalPlan = plan.transformDown {
case acc: SingleTableSQLAccumulator if acc.children.exists(_.isInstanceOf[JoinSQLAccumulator]) ⇒
val fixedChildOutput =
fixAmbiguousOutput(acc.tableExpression.get._1.outputExpressions, acc.igniteQueryContext)
val newOutput = substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)
acc.copy(
outputExpressions = newOutput,
where = acc.where.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
groupBy = acc.groupBy.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
having = acc.having.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
limit = acc.limit.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
localLimit = acc.localLimit.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
orderBy = acc.orderBy.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
acc
case acc: JoinSQLAccumulator
if acc.left.isInstanceOf[JoinSQLAccumulator] || acc.right.isInstanceOf[JoinSQLAccumulator] ⇒
val leftFixed = acc.left match {
case leftJoin: JoinSQLAccumulator ⇒
val fixedChildOutput = fixAmbiguousOutput(acc.left.outputExpressions, acc.igniteQueryContext)
val newOutput =
substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)
acc.copy(
outputExpressions = newOutput,
left = leftJoin.copy(outputExpressions = fixedChildOutput),
condition = acc.condition.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
where = acc.where.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
groupBy = acc.groupBy.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
having = acc.having.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
limit = acc.limit.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
localLimit = acc.localLimit.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
orderBy = acc.orderBy.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
case _ ⇒ acc
}
val fixed = leftFixed.right match {
case rightJoin: JoinSQLAccumulator ⇒
val fixedChildOutput =
fixAmbiguousOutput(leftFixed.outputExpressions, leftFixed.igniteQueryContext)
val newOutput = substituteExpressions(leftFixed.outputExpressions, fixedChildOutput)
leftFixed.copy(
outputExpressions = newOutput,
right = rightJoin.copy(outputExpressions = fixedChildOutput),
condition = acc.condition.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
where = acc.where.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
groupBy = acc.groupBy.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
having = acc.having.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
limit = acc.limit.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
localLimit = acc.localLimit.map(
substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
orderBy = acc.orderBy.map(
substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
case _ ⇒ leftFixed
}
fixed.copy(
condition = acc.condition.map(
substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
where = acc.where.map(
substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
groupBy = acc.groupBy.map(
substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
having = acc.having.map(
substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
limit = acc.limit.map(
substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
localLimit = acc.localLimit.map(
substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
orderBy = acc.orderBy.map(
substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)))
case unknown ⇒
unknown
}