in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala [736:909]
def transformFilterToJoin(plan: LogicalPlan, needProjection: Boolean): LogicalPlan = {
val isRowDeletedInTableMap = scala.collection.mutable.Map.empty[String, Boolean]
// if the join push down is enabled, then no need to add projection list to the logical plan as
// we can directly map the join output with the required projections
// if it is false then the join will not be pushed down to carbon and
// there it is required to add projection list to map the output from the join
val pushDownJoinEnabled = sparkSession.sparkContext.getConf
.getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
val transformChild = false
var addProjection = needProjection
// to store the sort node per query
var sortNodeForPushDown: Sort = null
// to store the limit literal per query
var limitLiteral: Literal = null
// by default do not push down notNull filter,
// but for orderby limit push down, push down notNull filter also. Else we get wrong results.
var pushDownNotNullFilter: Boolean = false
val transformedPlan = transformPlan(plan, {
case union: Union =>
// In case of Union, Extra Project has to be added to the Plan. Because if left table is
// pushed to SI and right table is not pushed, then Output Attribute mismatch will happen
addProjection = true
(union, true)
case sort@Sort(_, _, _) =>
addProjection = true
(sort, true)
case limit@Limit(literal: Literal, sort@Sort(_, _, child)) =>
child match {
case filter: Filter =>
if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, filter)) {
sortNodeForPushDown = sort
limitLiteral = literal
pushDownNotNullFilter = true
}
case p: Project if p.child.isInstanceOf[Filter] =>
if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
sort,
p.child.asInstanceOf[Filter])) {
sortNodeForPushDown = sort
limitLiteral = literal
pushDownNotNullFilter = true
}
case _ =>
}
(limit, transformChild)
case limit@Limit(literal: Literal, _@Project(_, sort@Sort(_, _, child))) =>
child match {
case f: Filter =>
if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, f)) {
sortNodeForPushDown = sort
limitLiteral = literal
pushDownNotNullFilter = true
}
case p: Project if (p.child.isInstanceOf[Filter]) =>
if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
sort,
p.child.asInstanceOf[Filter])) {
sortNodeForPushDown = sort
limitLiteral = literal
pushDownNotNullFilter = true
}
case _ =>
}
(limit, transformChild)
case filter@Filter(condition, _@MatchIndexableRelation(indexableRelation))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
val reWrittenPlan = rewritePlanForSecondaryIndex(
filter,
indexableRelation,
filter.child.asInstanceOf[LogicalRelation].relation
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
limitLiteral = limitLiteral,
sortNodeForPushDown = sortNodeForPushDown,
pushDownNotNullFilter = pushDownNotNullFilter)
if (reWrittenPlan.isInstanceOf[Join]) {
if (pushDownJoinEnabled && !addProjection) {
(reWrittenPlan, transformChild)
} else {
(Project(filter.output, reWrittenPlan), transformChild)
}
} else {
(filter, transformChild)
}
case projection@Project(cols, filter@Filter(condition,
_@MatchIndexableRelation(indexableRelation)))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
val reWrittenPlan = rewritePlanForSecondaryIndex(
filter,
indexableRelation,
filter.child.asInstanceOf[LogicalRelation].relation
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
cols,
limitLiteral = limitLiteral,
sortNodeForPushDown = sortNodeForPushDown,
pushDownNotNullFilter = pushDownNotNullFilter)
// If Index table is matched, join plan will be returned.
// Adding projection over join to return only selected columns from query.
// Else all columns from left & right table will be returned in output columns
if (reWrittenPlan.isInstanceOf[Join]) {
if (pushDownJoinEnabled && !addProjection) {
(reWrittenPlan, transformChild)
} else {
(Project(projection.output, reWrittenPlan), transformChild)
}
} else {
(projection, transformChild)
}
// When limit is provided in query, this limit literal can be pushed down to index table
// if all the filter columns have index table, then limit can be pushed down before grouping
// last index table, as number of records returned after join where unique and it will
// definitely return at least 1 record.
case limit@Limit(literal: Literal,
filter@Filter(condition, _@MatchIndexableRelation(indexableRelation)))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
val carbonRelation = filter.child.asInstanceOf[LogicalRelation].relation
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
val uniqueTableName = s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }"
if (!isRowDeletedInTableMap
.contains(s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }")) {
isRowDeletedInTableMap.put(uniqueTableName, isLimitPushDownRequired(carbonRelation))
}
val reWrittenPlan = if (isRowDeletedInTableMap(uniqueTableName)) {
rewritePlanForSecondaryIndex(filter, indexableRelation,
carbonRelation.databaseName, limitLiteral = literal)
} else {
rewritePlanForSecondaryIndex(filter, indexableRelation,
carbonRelation.databaseName)
}
if (reWrittenPlan.isInstanceOf[Join]) {
if (pushDownJoinEnabled && !addProjection) {
(Limit(literal, reWrittenPlan), transformChild)
} else {
(Limit(literal, Project(limit.output, reWrittenPlan)), transformChild)
}
} else {
(limit, transformChild)
}
case limit@Limit(literal: Literal, projection@Project(cols, filter@Filter(condition,
_@MatchIndexableRelation(indexableRelation))))
if !condition.isInstanceOf[IsNotNull] &&
CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
val carbonRelation = filter.child.asInstanceOf[LogicalRelation].relation
.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
val uniqueTableName = s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }"
if (!isRowDeletedInTableMap
.contains(s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }")) {
isRowDeletedInTableMap.put(uniqueTableName, isLimitPushDownRequired(carbonRelation))
}
val reWrittenPlan = if (isRowDeletedInTableMap(uniqueTableName)) {
rewritePlanForSecondaryIndex(filter, indexableRelation,
carbonRelation.databaseName, cols, limitLiteral = literal)
} else {
rewritePlanForSecondaryIndex(filter, indexableRelation,
carbonRelation.databaseName, cols)
}
if (reWrittenPlan.isInstanceOf[Join]) {
if (pushDownJoinEnabled && !addProjection) {
(Limit(literal, reWrittenPlan), transformChild)
} else {
(Limit(literal, Project(projection.output, reWrittenPlan)), transformChild)
}
} else {
(limit, transformChild)
}
})
val transformedPlanWithoutNIUdf = transformedPlan.transform {
case filter: Filter =>
Filter(CarbonHiveIndexMetadataUtil.transformToRemoveNI(filter.condition), filter.child)
}
transformedPlanWithoutNIUdf
}