in modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala [55:231]
private def pushDownOperators(plan: LogicalPlan): LogicalPlan = {
val aliasIndexIterator = Stream.from(1).iterator
//Flag to indicate that some step was skipped due to unsupported expression.
//When it true we has to skip entire transformation of higher level Nodes.
var stepSkipped = true
//Applying optimization rules from bottom to up tree nodes.
plan.transformUp {
//We found basic node to transform.
//We create new accumulator and going to the upper layers.
case LogicalRelation(igniteSqlRelation: IgniteSQLRelation[_, _], output, _catalogTable, _) ⇒
//Clear flag to optimize each statement separately
stepSkipped = false
val igniteQueryContext = IgniteQueryContext(
igniteContext = igniteSqlRelation.ic,
sqlContext = igniteSqlRelation.sqlContext,
catalogTable = _catalogTable,
aliasIndex = aliasIndexIterator,
cacheName =
sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName,
igniteSqlRelation.schemaName)
.getOrElse(throw new IgniteException("Unknown table")))
//Logical Relation is bottomest TreeNode in LogicalPlan.
//We replace it with accumulator.
//We push all supported SQL operator into it on the higher tree levels.
SingleTableSQLAccumulator(
igniteQueryContext = igniteQueryContext,
table = Some(igniteSqlRelation.tableName),
tableExpression = None,
outputExpressions = output.map(attr ⇒ attr.withQualifier(Seq(igniteSqlRelation.tableName))))
case project: Project if !stepSkipped && exprsAllowed(project.projectList) ⇒
//Project layer just changes output of current query.
project.child match {
case acc: SelectAccumulator ⇒
acc.withOutputExpressions(
substituteExpressions(project.projectList, acc.outputExpressions))
case _ ⇒
throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
}
case sort: Sort if !stepSkipped && isSortPushDownAllowed(sort.order, sort.global) ⇒
sort.child match {
case acc: QueryAccumulator ⇒
acc.withOrderBy(sort.order)
case _ ⇒
throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
}
case filter: Filter if !stepSkipped && exprsAllowed(filter.condition) ⇒
filter.child match {
case acc: SelectAccumulator ⇒
if (hasAggregateInside(filter.condition) || acc.groupBy.isDefined)
acc.withHaving(acc.having.getOrElse(Nil) :+ filter.condition)
else
acc.withWhere(acc.where.getOrElse(Nil) :+ filter.condition)
case _ ⇒
throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
}
case agg: Aggregate
if !stepSkipped && exprsAllowed(agg.groupingExpressions) && exprsAllowed(agg.aggregateExpressions) ⇒
agg.child match {
case acc: SelectAccumulator ⇒
if (acc.groupBy.isDefined) {
val tableAlias = acc.igniteQueryContext.uniqueTableAlias
SingleTableSQLAccumulator(
igniteQueryContext = acc.igniteQueryContext,
table = None,
tableExpression = Some((acc, tableAlias)),
outputExpressions = agg.aggregateExpressions)
}
else
acc
.withGroupBy(agg.groupingExpressions)
.withOutputExpressions(
substituteExpressions(agg.aggregateExpressions, acc.outputExpressions))
case acc: QueryAccumulator ⇒
val tableAlias = acc.igniteQueryContext.uniqueTableAlias
SingleTableSQLAccumulator(
igniteQueryContext = acc.igniteQueryContext,
table = None,
tableExpression = Some((acc, tableAlias)),
outputExpressions = agg.aggregateExpressions)
case _ ⇒
throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
}
case limit: LocalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
limit.child match {
case acc: SelectAccumulator ⇒
acc.withLocalLimit(limit.limitExpr)
case acc: QueryAccumulator ⇒
acc.withLocalLimit(limit.limitExpr)
case _ ⇒
throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
}
case limit: GlobalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
limit.child.transformUp {
case acc: SelectAccumulator ⇒
acc.withLimit(limit.limitExpr)
case acc: QueryAccumulator ⇒
acc.withLimit(limit.limitExpr)
case _ ⇒
throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
}
case union: Union if !stepSkipped && isAllChildrenOptimized(union.children) ⇒
val first = union.children.head.asInstanceOf[QueryAccumulator]
val subQueries = union.children.map(_.asInstanceOf[QueryAccumulator])
UnionSQLAccumulator(
first.igniteQueryContext,
subQueries,
subQueries.head.output)
case join: Join
if !stepSkipped && isAllChildrenOptimized(Seq(join.left, join.right)) &&
join.condition.forall(exprsAllowed) ⇒
val left = join.left.asInstanceOf[QueryAccumulator]
val (leftOutput, leftAlias) =
if (!isSimpleTableAcc(left)) {
val tableAlias = left.igniteQueryContext.uniqueTableAlias
(left.output, Some(tableAlias))
}
else
(left.output, None)
val right = join.right.asInstanceOf[QueryAccumulator]
val (rightOutput, rightAlias) =
if (!isSimpleTableAcc(right) ||
leftAlias.getOrElse(left.qualifier) == right.qualifier) {
val tableAlias = right.igniteQueryContext.uniqueTableAlias
(right.output, Some(tableAlias))
}
else
(right.output, None)
JoinSQLAccumulator(
left.igniteQueryContext,
left,
right,
join.joinType,
leftOutput ++ rightOutput,
join.condition,
leftAlias,
rightAlias)
case unknown ⇒
stepSkipped = true
unknown
}
}