in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [344:442]
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val pushDownTuple = buildPushDownPredicatesResource(filters)
val pushDownRowKeyFilter = pushDownTuple._1
var pushDownDynamicLogicExpression = pushDownTuple._2
val valueArray = pushDownTuple._3
if (!usePushDownColumnFilter) {
pushDownDynamicLogicExpression = null
}
logDebug("pushDownRowKeyFilter: " + pushDownRowKeyFilter.ranges)
if (pushDownDynamicLogicExpression != null) {
logDebug(
"pushDownDynamicLogicExpression: " +
pushDownDynamicLogicExpression.toExpressionString)
}
logDebug("valueArray: " + valueArray.length)
val requiredQualifierDefinitionList =
new mutable.MutableList[Field]
requiredColumns.foreach(
c => {
val field = catalog.getField(c)
requiredQualifierDefinitionList += field
})
// retain the information for unit testing checks
DefaultSourceStaticUtils.populateLatestExecutionRules(
pushDownRowKeyFilter,
pushDownDynamicLogicExpression)
val getList = new util.ArrayList[Get]()
val rddList = new util.ArrayList[RDD[Row]]()
// add points to getList
pushDownRowKeyFilter.points.foreach(
p => {
val get = new Get(p)
requiredQualifierDefinitionList.foreach(
d => {
if (d.isRowKey)
get.addColumn(d.cfBytes, d.colBytes)
})
getList.add(get)
})
val pushDownFilterJava =
if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
Some(
new SparkSQLPushDownFilter(
pushDownDynamicLogicExpression,
valueArray,
requiredQualifierDefinitionList,
encoderClsName))
} else {
None
}
val hRdd = new HBaseTableScanRDD(
this,
hbaseContext,
pushDownFilterJava,
requiredQualifierDefinitionList.seq)
pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
var resultRDD: RDD[Row] = {
val tmp = hRdd.map {
r =>
val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
buildRow(indexedFields, r)
}
if (tmp.partitions.size > 0) {
tmp
} else {
null
}
}
if (resultRDD == null) {
val scan = new Scan()
scan.setCacheBlocks(blockCacheEnable)
scan.setBatch(batchNum)
scan.setCaching(cacheSize)
requiredQualifierDefinitionList.foreach(d => scan.addColumn(d.cfBytes, d.colBytes))
val rdd = hbaseContext
.hbaseRDD(TableName.valueOf(tableName), scan)
.map(
r => {
val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
buildRow(indexedFields, r._2)
})
resultRDD = rdd
}
resultRDD
}