in integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/joins/BroadCastSIFilterPushJoin.scala [150:252]
def addInFilterToPlan(buildPlan: SparkPlan,
carbonScan: SparkPlan,
inputCopy: Array[InternalRow],
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
buildSide: CarbonToSparkAdapter.CarbonBuildSideType,
isIndexTable: Boolean = false): Unit = {
val carbonBuildSide = CarbonBuildSide(buildSide)
val keys = {
if (carbonBuildSide.isLeft) {
leftKeys
} else {
rightKeys
}
}.map { a =>
BindReferences.bindReference(a, buildPlan.output)
}.toArray
val filters = keys.map {
k =>
inputCopy.map(
r => {
val curr = k.eval(r)
curr match {
case _: UTF8String => Literal(curr.toString).asInstanceOf[Expression]
case _: Long if k.dataType.isInstanceOf[TimestampType] =>
Literal(curr, TimestampType).asInstanceOf[Expression]
case _ => Literal(curr).asInstanceOf[Expression]
}
})
}
val filterKey = (if (carbonBuildSide.isLeft) {
rightKeys
} else {
leftKeys
}).collectFirst { case a: Attribute => a }
def resolveAlias(expressions: Seq[Expression]) = {
val aliasMap = new mutable.HashMap[Attribute, Expression]()
carbonScan.transformExpressions {
case alias: Alias =>
aliasMap.put(alias.toAttribute, alias.child)
alias
}
expressions.map {
case at: AttributeReference =>
// cannot use Map.get() as qualifier is different.
aliasMap.find(_._1.semanticEquals(at)) match {
case Some(child) => child._2
case _ => at
}
case others => others
}
}
val filterKeys = if (carbonBuildSide.isLeft) {
resolveAlias(rightKeys)
} else {
resolveAlias(leftKeys)
}
def matchScan(projectList: Seq[NamedExpression]): Boolean = {
filterKey.isDefined && (isIndexTable || projectList.exists(x =>
x.name.equalsIgnoreCase(filterKey.get.name) &&
x.exprId.id == filterKey.get.exprId.id &&
x.exprId.jvmId.equals(filterKey.get.exprId.jvmId)))
}
val tableScan = carbonScan.collectFirst {
case ProjectExec(projectList, batchData: CarbonDataSourceScan)
if matchScan(projectList) =>
batchData
case ProjectExec(projectList, rowData: RowDataSourceScanExec)
if matchScan(projectList) =>
rowData
case batchData: CarbonDataSourceScan
if matchScan(batchData.output.attrs) =>
batchData
case rowData: RowDataSourceScanExec
if matchScan(rowData.output) =>
rowData
}
val configuredFilterRecordSize = CarbonProperties.getInstance.getProperty(
CarbonCommonConstants.BROADCAST_RECORD_SIZE,
CarbonCommonConstants.DEFAULT_BROADCAST_RECORD_SIZE)
if (tableScan.isDefined && null != filters
&& filters.length > 0
&& ((filters(0).length > 0 && filters(0).length <= configuredFilterRecordSize.toInt) ||
isIndexTable)) {
logger.info("Pushing down filter for broadcast join. Filter size:" + filters(0).length)
tableScan.get match {
case scan: CarbonDataSourceScan =>
addPushDownToCarbonRDD(scan.inputRDDs().head,
addPushDownFilters(filterKeys, filters))
case _ =>
addPushDownToCarbonRDD(tableScan.get.asInstanceOf[RowDataSourceScanExec].rdd,
addPushDownFilters(filterKeys, filters))
}
}
}