def transformFilterToJoin()

in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala [736:909]


  def transformFilterToJoin(plan: LogicalPlan, needProjection: Boolean): LogicalPlan = {
    val isRowDeletedInTableMap = scala.collection.mutable.Map.empty[String, Boolean]
    // if the join push down is enabled, then no need to add projection list to the logical plan as
    // we can directly map the join output with the required projections
    // if it is false then the join will not be pushed down to carbon and
    // there it is required to add projection list to map the output from the join
    val pushDownJoinEnabled = sparkSession.sparkContext.getConf
      .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
    val transformChild = false
    var addProjection = needProjection
    // to store the sort node per query
    var sortNodeForPushDown: Sort = null
    // to store the limit literal per query
    var limitLiteral: Literal = null
    // by default do not push down notNull filter,
    // but for orderby limit push down, push down notNull filter also. Else we get wrong results.
    var pushDownNotNullFilter: Boolean = false
    val transformedPlan = transformPlan(plan, {
      case union: Union =>
        // In case of Union, Extra Project has to be added to the Plan. Because if left table is
        // pushed to SI and right table is not pushed, then Output Attribute mismatch will happen
        addProjection = true
        (union, true)
      case sort@Sort(_, _, _) =>
        addProjection = true
        (sort, true)
      case limit@Limit(literal: Literal, sort@Sort(_, _, child)) =>
        child match {
          case filter: Filter =>
            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, filter)) {
              sortNodeForPushDown = sort
              limitLiteral = literal
              pushDownNotNullFilter = true
            }
          case p: Project if p.child.isInstanceOf[Filter] =>
            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
              sort,
              p.child.asInstanceOf[Filter])) {
              sortNodeForPushDown = sort
              limitLiteral = literal
              pushDownNotNullFilter = true
            }
          case _ =>
        }
        (limit, transformChild)
      case limit@Limit(literal: Literal, _@Project(_, sort@Sort(_, _, child))) =>
        child match {
          case f: Filter =>
            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal, sort, f)) {
              sortNodeForPushDown = sort
              limitLiteral = literal
              pushDownNotNullFilter = true
            }
          case p: Project if (p.child.isInstanceOf[Filter]) =>
            if (checkIfPushDownOrderByLimitAndNotNullFilter(literal,
              sort,
              p.child.asInstanceOf[Filter])) {
              sortNodeForPushDown = sort
              limitLiteral = literal
              pushDownNotNullFilter = true
            }
          case _ =>
        }
        (limit, transformChild)
      case filter@Filter(condition, _@MatchIndexableRelation(indexableRelation))
        if !condition.isInstanceOf[IsNotNull] &&
           CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
        val reWrittenPlan = rewritePlanForSecondaryIndex(
          filter,
          indexableRelation,
          filter.child.asInstanceOf[LogicalRelation].relation
            .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
          limitLiteral = limitLiteral,
          sortNodeForPushDown = sortNodeForPushDown,
          pushDownNotNullFilter = pushDownNotNullFilter)
        if (reWrittenPlan.isInstanceOf[Join]) {
          if (pushDownJoinEnabled && !addProjection) {
            (reWrittenPlan, transformChild)
          } else {
            (Project(filter.output, reWrittenPlan), transformChild)
          }
        } else {
          (filter, transformChild)
        }
      case projection@Project(cols, filter@Filter(condition,
      _@MatchIndexableRelation(indexableRelation)))
        if !condition.isInstanceOf[IsNotNull] &&
           CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
        val reWrittenPlan = rewritePlanForSecondaryIndex(
          filter,
          indexableRelation,
          filter.child.asInstanceOf[LogicalRelation].relation
            .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.databaseName,
          cols,
          limitLiteral = limitLiteral,
          sortNodeForPushDown = sortNodeForPushDown,
          pushDownNotNullFilter = pushDownNotNullFilter)
        // If Index table is matched, join plan will be returned.
        // Adding projection over join to return only selected columns from query.
        // Else all columns from left & right table will be returned in output columns
        if (reWrittenPlan.isInstanceOf[Join]) {
          if (pushDownJoinEnabled && !addProjection) {
            (reWrittenPlan, transformChild)
          } else {
            (Project(projection.output, reWrittenPlan), transformChild)
          }
        } else {
          (projection, transformChild)
        }
      // When limit is provided in query, this limit literal can be pushed down to index table
      // if all the filter columns have index table, then limit can be pushed down before grouping
      // last index table, as number of records returned after join where unique and it will
      // definitely return at least 1 record.
      case limit@Limit(literal: Literal,
      filter@Filter(condition, _@MatchIndexableRelation(indexableRelation)))
        if !condition.isInstanceOf[IsNotNull] &&
           CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
        val carbonRelation = filter.child.asInstanceOf[LogicalRelation].relation
          .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
        val uniqueTableName = s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }"
        if (!isRowDeletedInTableMap
          .contains(s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }")) {
          isRowDeletedInTableMap.put(uniqueTableName, isLimitPushDownRequired(carbonRelation))
        }
        val reWrittenPlan = if (isRowDeletedInTableMap(uniqueTableName)) {
          rewritePlanForSecondaryIndex(filter, indexableRelation,
            carbonRelation.databaseName, limitLiteral = literal)
        } else {
          rewritePlanForSecondaryIndex(filter, indexableRelation,
            carbonRelation.databaseName)
        }
        if (reWrittenPlan.isInstanceOf[Join]) {
          if (pushDownJoinEnabled && !addProjection) {
            (Limit(literal, reWrittenPlan), transformChild)
          } else {
            (Limit(literal, Project(limit.output, reWrittenPlan)), transformChild)
          }
        } else {
          (limit, transformChild)
        }
      case limit@Limit(literal: Literal, projection@Project(cols, filter@Filter(condition,
      _@MatchIndexableRelation(indexableRelation))))
        if !condition.isInstanceOf[IsNotNull] &&
           CarbonIndexUtil.getSecondaryIndexes(indexableRelation).nonEmpty =>
        val carbonRelation = filter.child.asInstanceOf[LogicalRelation].relation
          .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
        val uniqueTableName = s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }"
        if (!isRowDeletedInTableMap
          .contains(s"${ carbonRelation.databaseName }.${ carbonRelation.tableName }")) {
          isRowDeletedInTableMap.put(uniqueTableName, isLimitPushDownRequired(carbonRelation))
        }
        val reWrittenPlan = if (isRowDeletedInTableMap(uniqueTableName)) {
          rewritePlanForSecondaryIndex(filter, indexableRelation,
            carbonRelation.databaseName, cols, limitLiteral = literal)
        } else {
          rewritePlanForSecondaryIndex(filter, indexableRelation,
            carbonRelation.databaseName, cols)
        }
        if (reWrittenPlan.isInstanceOf[Join]) {
          if (pushDownJoinEnabled && !addProjection) {
            (Limit(literal, reWrittenPlan), transformChild)
          } else {
            (Limit(literal, Project(projection.output, reWrittenPlan)), transformChild)
          }
        } else {
          (limit, transformChild)
        }
    })
    val transformedPlanWithoutNIUdf = transformedPlan.transform {
      case filter: Filter =>
        Filter(CarbonHiveIndexMetadataUtil.transformToRemoveNI(filter.condition), filter.child)
    }
    transformedPlanWithoutNIUdf
  }