in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableScanRDD.scala [235:271]
override def compute(split: Partition, context: TaskContext): Iterator[Result] = {
val partition = split.asInstanceOf[HBaseScanPartition]
val filter = SerializedFilter.fromSerializedFilter(partition.sf)
val scans = partition.scanRanges
.map(buildScan(_, filter, columns))
val tableResource = TableResource(relation)
context.addTaskCompletionListener[Unit](context => close())
val points = partition.points
val gIt: Iterator[Result] = {
if (points.isEmpty) {
Iterator.empty: Iterator[Result]
} else {
buildGets(tableResource, points, filter, columns, hbaseContext)
}
}
val rIts = scans.par
.map {
scan =>
hbaseContext.applyCreds()
val scanner = tableResource.getScanner(scan)
rddResources.addResource(scanner)
scanner
}
.map(toResultIterator(_))
.fold(Iterator.empty: Iterator[Result]) {
case (x, y) =>
x ++ y
} ++ gIt
ShutdownHookManager.affixShutdownHook(
new Thread() {
override def run() {
HBaseConnectionCache.close()
}
},
0)
rIts
}