in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [311:392]
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val pushDownTuple = buildPushDownPredicatesResource(filters)
val pushDownRowKeyFilter = pushDownTuple._1
var pushDownDynamicLogicExpression = pushDownTuple._2
val valueArray = pushDownTuple._3
if (!usePushDownColumnFilter) {
pushDownDynamicLogicExpression = null
}
logDebug("pushDownRowKeyFilter: " + pushDownRowKeyFilter.ranges)
if (pushDownDynamicLogicExpression != null) {
logDebug("pushDownDynamicLogicExpression: " +
pushDownDynamicLogicExpression.toExpressionString)
}
logDebug("valueArray: " + valueArray.length)
val requiredQualifierDefinitionList =
new mutable.MutableList[Field]
requiredColumns.foreach( c => {
val field = catalog.getField(c)
requiredQualifierDefinitionList += field
})
//retain the information for unit testing checks
DefaultSourceStaticUtils.populateLatestExecutionRules(pushDownRowKeyFilter,
pushDownDynamicLogicExpression)
val getList = new util.ArrayList[Get]()
val rddList = new util.ArrayList[RDD[Row]]()
//add points to getList
pushDownRowKeyFilter.points.foreach(p => {
val get = new Get(p)
requiredQualifierDefinitionList.foreach( d => {
if (d.isRowKey)
get.addColumn(d.cfBytes, d.colBytes)
})
getList.add(get)
})
val pushDownFilterJava = if (usePushDownColumnFilter && pushDownDynamicLogicExpression != null) {
Some(new SparkSQLPushDownFilter(pushDownDynamicLogicExpression,
valueArray, requiredQualifierDefinitionList, encoderClsName))
} else {
None
}
val hRdd = new HBaseTableScanRDD(this, hbaseContext, pushDownFilterJava, requiredQualifierDefinitionList.seq)
pushDownRowKeyFilter.points.foreach(hRdd.addPoint(_))
pushDownRowKeyFilter.ranges.foreach(hRdd.addRange(_))
var resultRDD: RDD[Row] = {
val tmp = hRdd.map{ r =>
val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
buildRow(indexedFields, r)
}
if (tmp.partitions.size > 0) {
tmp
} else {
null
}
}
if (resultRDD == null) {
val scan = new Scan()
scan.setCacheBlocks(blockCacheEnable)
scan.setBatch(batchNum)
scan.setCaching(cacheSize)
requiredQualifierDefinitionList.foreach( d =>
scan.addColumn(d.cfBytes, d.colBytes))
val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => {
val indexedFields = getIndexedProjections(requiredColumns).map(_._1)
buildRow(indexedFields, r._2)
})
resultRDD=rdd
}
resultRDD
}