in core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsDirectDStream.scala [110:143]
override def compute(validTime: Time): Option[RDD[EventData]] = {
val sortedExecutors = getSortedExecutorList(ssc.sparkContext)
val numExecutors = sortedExecutors.length
logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
val (earliest, latest) = earliestAndLatest
// Make sure our fromSeqNos are greater than or equal
// to the earliest event in the service.
fromSeqNos = fromSeqNos.map {
case (p, seqNo) =>
if (earliest(p) > seqNo) p -> earliest(p) else p -> seqNo
}
val untilSeqNos = clamp(latest)
val offsetRanges = (for {
p <- 0 until partitionCount
preferredLoc = if (numExecutors > 0) {
Some(sortedExecutors(Math.floorMod(NameAndPartition(ehName, p).hashCode, numExecutors)))
} else None
} yield
OffsetRange(NameAndPartition(ehName, p), fromSeqNos(p), untilSeqNos(p), preferredLoc)).toArray
val rdd = new EventHubsRDD(context.sparkContext, ehConf.trimmed, offsetRanges)
val description = offsetRanges.map(_.toString).mkString("\n")
logInfo(s"Starting batch at $validTime for EH: $ehName with\n$description")
val metadata =
Map("seqNos" -> offsetRanges, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
fromSeqNos = untilSeqNos
Some(rdd)
}