in core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala [59:89]
override def count: Long = offsetRanges.map(_.count).sum
override def isEmpty(): Boolean = count == 0L
override def take(num: Int): Array[EventData] = {
val nonEmptyPartitions =
this.partitions.map(_.asInstanceOf[EventHubsRDDPartition]).filter(_.count > 0)
if (num < 1 || nonEmptyPartitions.isEmpty) {
return Array()
}
val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
val remain = num - result.values.sum
if (remain > 0) {
val taken = Math.min(remain, part.count)
result + (part.index -> taken.toInt)
} else {
result
}
}
context
.runJob(
this,
(tc: TaskContext, it: Iterator[EventData]) => it.take(parts(tc.partitionId)).toArray,
parts.keys.toArray
)
.flatten
}