private def errBeginAfterEnd()

in core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala [96:125]


  private def errBeginAfterEnd(part: EventHubsRDDPartition): String =
    s"The beginning sequence number ${part.fromSeqNo} is larger than the ending " +
      s"sequence number ${part.untilSeqNo} for EventHubs ${part.name} on partition " +
      s"${part.partitionId}."

  override def compute(partition: Partition, context: TaskContext): Iterator[EventData] = {
    val part = partition.asInstanceOf[EventHubsRDDPartition]
    assert(part.fromSeqNo <= part.untilSeqNo, errBeginAfterEnd(part))

    if (part.fromSeqNo == part.untilSeqNo) {
      logInfo(
        s"(TID ${context.taskAttemptId()}) Beginning sequence number ${part.fromSeqNo} is equal to the ending sequence " +
          s"number ${part.untilSeqNo}. Returning empty partition for EH: ${part.name} " +
          s"on partition: ${part.partitionId}")
      Iterator.empty
    } else {
      logInfo(
        s"(TID ${context.taskAttemptId()}) Computing EventHubs ${part.name}, partition ${part.partitionId} " +
          s"sequence numbers ${part.fromSeqNo} => ${part.untilSeqNo}")
      val cachedReceiver = if (ehConf.useSimulatedClient) {
        SimulatedCachedReceiver
      } else {
        CachedEventHubsReceiver
      }
      cachedReceiver.receive(ehConf,
                             part.nameAndPartition,
                             part.fromSeqNo,
                             (part.untilSeqNo - part.fromSeqNo).toInt)
    }
  }