override def count: Long = offsetRanges.map()

in core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala [59:89]


  override def count: Long = offsetRanges.map(_.count).sum

  override def isEmpty(): Boolean = count == 0L

  override def take(num: Int): Array[EventData] = {
    val nonEmptyPartitions =
      this.partitions.map(_.asInstanceOf[EventHubsRDDPartition]).filter(_.count > 0)

    if (num < 1 || nonEmptyPartitions.isEmpty) {
      return Array()
    }

    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
      val remain = num - result.values.sum
      if (remain > 0) {
        val taken = Math.min(remain, part.count)
        result + (part.index -> taken.toInt)

      } else {
        result
      }
    }

    context
      .runJob(
        this,
        (tc: TaskContext, it: Iterator[EventData]) => it.take(parts(tc.partitionId)).toArray,
        parts.keys.toArray
      )
      .flatten
  }