def addInFilterToPlan()

in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala [150:252]


  def addInFilterToPlan(buildPlan: SparkPlan,
      carbonScan: SparkPlan,
      inputCopy: Array[InternalRow],
      leftKeys: Seq[Expression],
      rightKeys: Seq[Expression],
      buildSide: CarbonToSparkAdapter.CarbonBuildSideType,
      isIndexTable: Boolean = false): Unit = {

    val carbonBuildSide = CarbonBuildSide(buildSide)
    val keys = {
      if (carbonBuildSide.isLeft) {
        leftKeys
      } else {
        rightKeys
      }
      }.map { a =>
      BindReferences.bindReference(a, buildPlan.output)
    }.toArray

    val filters = keys.map {
      k =>
        inputCopy.map(
          r => {
            val curr = k.eval(r)
            curr match {
              case _: UTF8String => Literal(curr.toString).asInstanceOf[Expression]
              case _: Long if k.dataType.isInstanceOf[TimestampType] =>
                Literal(curr, TimestampType).asInstanceOf[Expression]
              case _ => Literal(curr).asInstanceOf[Expression]
            }
          })
    }

    val filterKey = (if (carbonBuildSide.isLeft) {
      rightKeys
    } else {
      leftKeys
    }).collectFirst { case a: Attribute => a }

    def resolveAlias(expressions: Seq[Expression]) = {
      val aliasMap = new mutable.HashMap[Attribute, Expression]()
      carbonScan.transformExpressions {
        case alias: Alias =>
          aliasMap.put(alias.toAttribute, alias.child)
          alias
      }
      expressions.map {
        case at: AttributeReference =>
          // cannot use Map.get() as qualifier is different.
          aliasMap.find(_._1.semanticEquals(at)) match {
            case Some(child) => child._2
            case _ => at
          }
        case others => others
      }
    }

    val filterKeys = if (carbonBuildSide.isLeft) {
      resolveAlias(rightKeys)
    } else {
      resolveAlias(leftKeys)
    }

    def matchScan(projectList: Seq[NamedExpression]): Boolean = {
      filterKey.isDefined && (isIndexTable || projectList.exists(x =>
        x.name.equalsIgnoreCase(filterKey.get.name) &&
        x.exprId.id == filterKey.get.exprId.id &&
        x.exprId.jvmId.equals(filterKey.get.exprId.jvmId)))
    }

    val tableScan = carbonScan.collectFirst {
      case ProjectExec(projectList, batchData: CarbonDataSourceScan)
        if matchScan(projectList) =>
        batchData
      case ProjectExec(projectList, rowData: RowDataSourceScanExec)
        if matchScan(projectList) =>
        rowData
      case batchData: CarbonDataSourceScan
        if matchScan(batchData.output.attrs) =>
        batchData
      case rowData: RowDataSourceScanExec
        if matchScan(rowData.output) =>
        rowData
    }
    val configuredFilterRecordSize = CarbonProperties.getInstance.getProperty(
      CarbonCommonConstants.BROADCAST_RECORD_SIZE,
      CarbonCommonConstants.DEFAULT_BROADCAST_RECORD_SIZE)

    if (tableScan.isDefined && null != filters
        && filters.length > 0
        && ((filters(0).length > 0 && filters(0).length <= configuredFilterRecordSize.toInt) ||
            isIndexTable)) {
      logger.info("Pushing down filter for broadcast join. Filter size:" + filters(0).length)
      tableScan.get match {
        case scan: CarbonDataSourceScan =>
          addPushDownToCarbonRDD(scan.inputRDDs().head,
            addPushDownFilters(filterKeys, filters))
        case _ =>
          addPushDownToCarbonRDD(tableScan.get.asInstanceOf[RowDataSourceScanExec].rdd,
            addPushDownFilters(filterKeys, filters))
      }
    }
  }