override def apply()

in integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala [54:241]


  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
    plan match {
      // load data / insert into
      case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
        ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
      case insert: InsertIntoCarbonTable =>
        if (insert.containsMultipleInserts) {
          // Successful insert in carbon will return segment ID in a row.
          // In-case of this specific multiple inserts scenario the Union node executes in the
          // physical plan phase of the command, so the rows should be of unsafe row object.
          // So we should override the sideEffectResult to prepare the content of command's
          // corresponding rdd from physical plan of insert into command.
          UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
        } else {
          ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
        }
      case insert: InsertIntoHadoopFsRelationCommand
        if insert.catalogTable.isDefined && isCarbonTable(insert.catalogTable.get.identifier) =>
        DataWritingCommandExec(DMLHelper.insertInto(insert), planLater(insert.query)) :: Nil
      case CountStarPlan(colAttr, PhysicalOperation(_, _, l: LogicalRelation))
        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && driverSideCountStar(l) =>
        val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
        CarbonCountStar(colAttr, relation.carbonTable, SparkSession.getActiveSession.get) :: Nil
      case join: Join if join.condition.isDefined && join.condition.get.isInstanceOf[ScalaUDF] &&
                         isPolygonJoinUdfFilter(join.condition) =>
        val condition = join.condition
        if (join.joinType != Inner) {
          throw new UnsupportedOperationException("Unsupported query")
        }
        val carbon = CarbonSourceStrategy.apply(join.left).head
        val leftKeys = Seq(condition.get.asInstanceOf[ScalaUDF].children.head)
        val rightKeys = Seq(condition.get.asInstanceOf[ScalaUDF].children.last)
        if (condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinUDF]) {
          // If join condition is IN_POLYGON_JOIN udf, then add a implicit projection to the
          // polygon table logical plan
          val tableInfo = carbon.collectFirst {
            case scan: CarbonDataSourceScan => scan.inputRDDs().head
          }.get.asInstanceOf[CarbonScanRDD[InternalRow]].getTableInfo
          // create ToRangeListAsString udf as implicit projection with the required fields
          val toRangeListUDF = new ToRangeListAsStringUDF
          val dataType = StringType
          var children: Seq[Expression] = mutable.Seq.empty
          val geoHashColumn = condition.get.children.head match {
            case Cast(attr: AttributeReference, _, _) =>
              attr
            case attr: AttributeReference =>
              attr
          }
          // get origin Latitude and gridSize from spatial table properties
          val commonKey = CarbonCommonConstants.SPATIAL_INDEX +
                          CarbonCommonConstants.POINT +
                          geoHashColumn.name +
                          CarbonCommonConstants.POINT
          val originLatitude = tableInfo.getFactTable
            .getTableProperties
            .get(commonKey + "originlatitude")
          val gridSize = tableInfo.getFactTable.getTableProperties.get(commonKey + "gridsize")
          if (originLatitude == null || gridSize == null) {
            throw new UnsupportedOperationException(
              s"Join condition having left column ${ geoHashColumn.name } is not GeoId column")
          }
          // join condition right side will be the polygon column
          children = children :+ condition.get.children.last
          children = children :+ Literal(originLatitude.toDouble)
          children = children :+ Literal(gridSize.toInt)

          var inputTypes: Seq[DataType] = Seq.empty
          inputTypes = inputTypes :+ StringType
          inputTypes = inputTypes :+ DoubleType
          inputTypes = inputTypes :+ IntegerType
          val rangeListScalaUdf = CarbonToSparkAdapter.createRangeListScalaUDF(toRangeListUDF,
            dataType, children, inputTypes)
          // add ToRangeListAsString udf column to the polygon table plan projection list
          val rightSide = join.right transform {
            case Project(projectList, child) =>
              val positionId = UnresolvedAlias(rangeListScalaUdf)
              val newProjectList = projectList :+ positionId
              Project(newProjectList, child)
          }
          val sparkSession = SparkSQLUtil.getSparkSession
          lazy val analyzer = sparkSession.sessionState.analyzer
          lazy val optimizer = sparkSession.sessionState.optimizer
          val analyzedPlan = CarbonToSparkAdapter.invokeAnalyzerExecute(
            analyzer, rightSide)
          val polygonTablePlan = optimizer.execute(analyzedPlan)
          // transform join condition by replacing polygon column with ToRangeListAsString udf
          // column output
          val newCondition = condition.get transform {
            case scalaUdf: ScalaUDF if scalaUdf.function.isInstanceOf[InPolygonJoinUDF] =>
              var udfChildren: Seq[Expression] = Seq.empty
              udfChildren = udfChildren :+ scalaUdf.children.head
              udfChildren = udfChildren :+ polygonTablePlan.output.last
              val polygonJoinUdf = new InPolygonJoinUDF
              CarbonToSparkAdapter.getTransformedPolygonJoinUdf(scalaUdf,
                udfChildren, polygonJoinUdf)
          }
          // push down in_polygon join filter to carbon
          val pushedDownJoin = BroadCastPolygonFilterPushJoin(
            leftKeys,
            rightKeys,
            join.joinType,
            Some(newCondition),
            carbon,
            PlanLater(polygonTablePlan)
          )
          Some(newCondition).map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
        } else {
          // push down in_polygon join filter to carbon
          val pushedDownJoin = BroadCastPolygonFilterPushJoin(
            leftKeys,
            rightKeys,
            join.joinType,
            condition,
            carbon,
            PlanLater(join.right)
          )
          condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
        }
      case CarbonExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
        if isCarbonPlan(left) && CarbonIndexUtil.checkIsIndexTable(right) =>
        LOGGER.info(s"pushing down for ExtractEquiJoinKeys:right")
        val carbon = CarbonSourceStrategy.apply(left).head
        // in case of SI Filter push join remove projection list from the physical plan
        // no need to have the project list in the main table physical plan execution
        // only join uses the projection list
        var carbonChild = carbon match {
          case projectExec: ProjectExec =>
            projectExec.child
          case _ =>
            carbon
        }
        // check if the outer and the inner project are matching, only then remove project
        if (left.isInstanceOf[Project]) {
          val leftOutput = left.output
            .filterNot(_.name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID))
            .map(c => (c.name.toLowerCase, c.dataType))
          val childOutput = carbonChild.output
            .filterNot(_.name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID))
            .map(c => (c.name.toLowerCase, c.dataType))
          if (!leftOutput.equals(childOutput)) {
            // if the projection list and the scan list are different(in case of alias)
            // we should not skip the project, so we are taking the original plan with project
            carbonChild = carbon
          }
        }
        val pushedDownJoin = BroadCastSIFilterPushJoin(
          leftKeys: Seq[Expression],
          rightKeys: Seq[Expression],
          Inner,
          CarbonToSparkAdapter.getBuildRight,
          carbonChild,
          planLater(right),
          condition)
        condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
      case CarbonExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left,
      right)
        if isCarbonPlan(right) && CarbonIndexUtil.checkIsIndexTable(left) =>
        LOGGER.info(s"pushing down for ExtractEquiJoinKeys:left")
        val carbon = CarbonSourceStrategy.apply(right).head
        val pushedDownJoin =
          BroadCastSIFilterPushJoin(
            leftKeys: Seq[Expression],
            rightKeys: Seq[Expression],
            Inner,
            CarbonToSparkAdapter.getBuildLeft,
            planLater(left),
            carbon,
            condition)
        condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
      case CarbonExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition,
      left, right)
        if isLeftSemiExistPushDownEnabled &&
          isAllCarbonPlan(left) && isAllCarbonPlan(right) =>
        LOGGER.info(s"pushing down for ExtractEquiJoinKeysLeftSemiExist:right")
        val pushedDownJoin = BroadCastSIFilterPushJoin(
          leftKeys: Seq[Expression],
          rightKeys: Seq[Expression],
          LeftSemi,
          CarbonToSparkAdapter.getBuildRight,
          planLater(left),
          planLater(right),
          condition)
        condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
      case ExtractTakeOrderedAndProjectExec(carbonTakeOrderedAndProjectExec) =>
        carbonTakeOrderedAndProjectExec :: Nil
      case _ => Nil
    }
  }