in integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala [54:241]
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
// load data / insert into
case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
case insert: InsertIntoCarbonTable =>
if (insert.containsMultipleInserts) {
// Successful insert in carbon will return segment ID in a row.
// In-case of this specific multiple inserts scenario the Union node executes in the
// physical plan phase of the command, so the rows should be of unsafe row object.
// So we should override the sideEffectResult to prepare the content of command's
// corresponding rdd from physical plan of insert into command.
UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
} else {
ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
}
case insert: InsertIntoHadoopFsRelationCommand
if insert.catalogTable.isDefined && isCarbonTable(insert.catalogTable.get.identifier) =>
DataWritingCommandExec(DMLHelper.insertInto(insert), planLater(insert.query)) :: Nil
case CountStarPlan(colAttr, PhysicalOperation(_, _, l: LogicalRelation))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && driverSideCountStar(l) =>
val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
CarbonCountStar(colAttr, relation.carbonTable, SparkSession.getActiveSession.get) :: Nil
case join: Join if join.condition.isDefined && join.condition.get.isInstanceOf[ScalaUDF] &&
isPolygonJoinUdfFilter(join.condition) =>
val condition = join.condition
if (join.joinType != Inner) {
throw new UnsupportedOperationException("Unsupported query")
}
val carbon = CarbonSourceStrategy.apply(join.left).head
val leftKeys = Seq(condition.get.asInstanceOf[ScalaUDF].children.head)
val rightKeys = Seq(condition.get.asInstanceOf[ScalaUDF].children.last)
if (condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinUDF]) {
// If join condition is IN_POLYGON_JOIN udf, then add a implicit projection to the
// polygon table logical plan
val tableInfo = carbon.collectFirst {
case scan: CarbonDataSourceScan => scan.inputRDDs().head
}.get.asInstanceOf[CarbonScanRDD[InternalRow]].getTableInfo
// create ToRangeListAsString udf as implicit projection with the required fields
val toRangeListUDF = new ToRangeListAsStringUDF
val dataType = StringType
var children: Seq[Expression] = mutable.Seq.empty
val geoHashColumn = condition.get.children.head match {
case Cast(attr: AttributeReference, _, _) =>
attr
case attr: AttributeReference =>
attr
}
// get origin Latitude and gridSize from spatial table properties
val commonKey = CarbonCommonConstants.SPATIAL_INDEX +
CarbonCommonConstants.POINT +
geoHashColumn.name +
CarbonCommonConstants.POINT
val originLatitude = tableInfo.getFactTable
.getTableProperties
.get(commonKey + "originlatitude")
val gridSize = tableInfo.getFactTable.getTableProperties.get(commonKey + "gridsize")
if (originLatitude == null || gridSize == null) {
throw new UnsupportedOperationException(
s"Join condition having left column ${ geoHashColumn.name } is not GeoId column")
}
// join condition right side will be the polygon column
children = children :+ condition.get.children.last
children = children :+ Literal(originLatitude.toDouble)
children = children :+ Literal(gridSize.toInt)
var inputTypes: Seq[DataType] = Seq.empty
inputTypes = inputTypes :+ StringType
inputTypes = inputTypes :+ DoubleType
inputTypes = inputTypes :+ IntegerType
val rangeListScalaUdf = CarbonToSparkAdapter.createRangeListScalaUDF(toRangeListUDF,
dataType, children, inputTypes)
// add ToRangeListAsString udf column to the polygon table plan projection list
val rightSide = join.right transform {
case Project(projectList, child) =>
val positionId = UnresolvedAlias(rangeListScalaUdf)
val newProjectList = projectList :+ positionId
Project(newProjectList, child)
}
val sparkSession = SparkSQLUtil.getSparkSession
lazy val analyzer = sparkSession.sessionState.analyzer
lazy val optimizer = sparkSession.sessionState.optimizer
val analyzedPlan = CarbonToSparkAdapter.invokeAnalyzerExecute(
analyzer, rightSide)
val polygonTablePlan = optimizer.execute(analyzedPlan)
// transform join condition by replacing polygon column with ToRangeListAsString udf
// column output
val newCondition = condition.get transform {
case scalaUdf: ScalaUDF if scalaUdf.function.isInstanceOf[InPolygonJoinUDF] =>
var udfChildren: Seq[Expression] = Seq.empty
udfChildren = udfChildren :+ scalaUdf.children.head
udfChildren = udfChildren :+ polygonTablePlan.output.last
val polygonJoinUdf = new InPolygonJoinUDF
CarbonToSparkAdapter.getTransformedPolygonJoinUdf(scalaUdf,
udfChildren, polygonJoinUdf)
}
// push down in_polygon join filter to carbon
val pushedDownJoin = BroadCastPolygonFilterPushJoin(
leftKeys,
rightKeys,
join.joinType,
Some(newCondition),
carbon,
PlanLater(polygonTablePlan)
)
Some(newCondition).map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
} else {
// push down in_polygon join filter to carbon
val pushedDownJoin = BroadCastPolygonFilterPushJoin(
leftKeys,
rightKeys,
join.joinType,
condition,
carbon,
PlanLater(join.right)
)
condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
}
case CarbonExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if isCarbonPlan(left) && CarbonIndexUtil.checkIsIndexTable(right) =>
LOGGER.info(s"pushing down for ExtractEquiJoinKeys:right")
val carbon = CarbonSourceStrategy.apply(left).head
// in case of SI Filter push join remove projection list from the physical plan
// no need to have the project list in the main table physical plan execution
// only join uses the projection list
var carbonChild = carbon match {
case projectExec: ProjectExec =>
projectExec.child
case _ =>
carbon
}
// check if the outer and the inner project are matching, only then remove project
if (left.isInstanceOf[Project]) {
val leftOutput = left.output
.filterNot(_.name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID))
.map(c => (c.name.toLowerCase, c.dataType))
val childOutput = carbonChild.output
.filterNot(_.name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID))
.map(c => (c.name.toLowerCase, c.dataType))
if (!leftOutput.equals(childOutput)) {
// if the projection list and the scan list are different(in case of alias)
// we should not skip the project, so we are taking the original plan with project
carbonChild = carbon
}
}
val pushedDownJoin = BroadCastSIFilterPushJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
Inner,
CarbonToSparkAdapter.getBuildRight,
carbonChild,
planLater(right),
condition)
condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
case CarbonExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left,
right)
if isCarbonPlan(right) && CarbonIndexUtil.checkIsIndexTable(left) =>
LOGGER.info(s"pushing down for ExtractEquiJoinKeys:left")
val carbon = CarbonSourceStrategy.apply(right).head
val pushedDownJoin =
BroadCastSIFilterPushJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
Inner,
CarbonToSparkAdapter.getBuildLeft,
planLater(left),
carbon,
condition)
condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
case CarbonExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition,
left, right)
if isLeftSemiExistPushDownEnabled &&
isAllCarbonPlan(left) && isAllCarbonPlan(right) =>
LOGGER.info(s"pushing down for ExtractEquiJoinKeysLeftSemiExist:right")
val pushedDownJoin = BroadCastSIFilterPushJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
LeftSemi,
CarbonToSparkAdapter.getBuildRight,
planLater(left),
planLater(right),
condition)
condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
case ExtractTakeOrderedAndProjectExec(carbonTakeOrderedAndProjectExec) =>
carbonTakeOrderedAndProjectExec :: Nil
case _ => Nil
}
}