in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala [218:249]
override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
val partition = split.asInstanceOf[HBaseScanPartition]
val filter = SerializedFilter.fromSerializedFilter(partition.sf)
val scans = partition.scanRanges
.map(buildScan(_, filter, columns))
val tableResource = TableResource(relation)
context.addTaskCompletionListener[Unit](context => close())
val points = partition.points
val gIt: Iterator[Result] = {
if (points.isEmpty) {
Iterator.empty: Iterator[Result]
} else {
buildGets(tableResource, points, filter, columns, hbaseContext)
}
}
val rIts = scans.par
.map { scan =>
hbaseContext.applyCreds()
val scanner = tableResource.getScanner(scan)
rddResources.addResource(scanner)
scanner
}.map(toResultIterator(_))
.fold(Iterator.empty: Iterator[Result]){ case (x, y) =>
x ++ y
} ++ gIt
ShutdownHookManager.affixShutdownHook( new Thread() {
override def run() {
HBaseConnectionCache.close()
}
}, 0)
rIts
}