override def compute()

in core/src/main/scala/org/apache/spark/streaming/eventhubs/EventHubsDirectDStream.scala [110:143]


  override def compute(validTime: Time): Option[RDD[EventData]] = {
    val sortedExecutors = getSortedExecutorList(ssc.sparkContext)
    val numExecutors = sortedExecutors.length
    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))

    val (earliest, latest) = earliestAndLatest
    // Make sure our fromSeqNos are greater than or equal
    // to the earliest event in the service.
    fromSeqNos = fromSeqNos.map {
      case (p, seqNo) =>
        if (earliest(p) > seqNo) p -> earliest(p) else p -> seqNo
    }
    val untilSeqNos = clamp(latest)
    val offsetRanges = (for {
      p <- 0 until partitionCount
      preferredLoc = if (numExecutors > 0) {
        Some(sortedExecutors(Math.floorMod(NameAndPartition(ehName, p).hashCode, numExecutors)))
      } else None
    } yield
      OffsetRange(NameAndPartition(ehName, p), fromSeqNos(p), untilSeqNos(p), preferredLoc)).toArray

    val rdd = new EventHubsRDD(context.sparkContext, ehConf.trimmed, offsetRanges)

    val description = offsetRanges.map(_.toString).mkString("\n")
    logInfo(s"Starting batch at $validTime for EH: $ehName with\n$description")

    val metadata =
      Map("seqNos" -> offsetRanges, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    fromSeqNos = untilSeqNos
    Some(rdd)
  }