in core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala [96:125]
private def errBeginAfterEnd(part: EventHubsRDDPartition): String =
s"The beginning sequence number ${part.fromSeqNo} is larger than the ending " +
s"sequence number ${part.untilSeqNo} for EventHubs ${part.name} on partition " +
s"${part.partitionId}."
override def compute(partition: Partition, context: TaskContext): Iterator[EventData] = {
val part = partition.asInstanceOf[EventHubsRDDPartition]
assert(part.fromSeqNo <= part.untilSeqNo, errBeginAfterEnd(part))
if (part.fromSeqNo == part.untilSeqNo) {
logInfo(
s"(TID ${context.taskAttemptId()}) Beginning sequence number ${part.fromSeqNo} is equal to the ending sequence " +
s"number ${part.untilSeqNo}. Returning empty partition for EH: ${part.name} " +
s"on partition: ${part.partitionId}")
Iterator.empty
} else {
logInfo(
s"(TID ${context.taskAttemptId()}) Computing EventHubs ${part.name}, partition ${part.partitionId} " +
s"sequence numbers ${part.fromSeqNo} => ${part.untilSeqNo}")
val cachedReceiver = if (ehConf.useSimulatedClient) {
SimulatedCachedReceiver
} else {
CachedEventHubsReceiver
}
cachedReceiver.receive(ehConf,
part.nameAndPartition,
part.fromSeqNo,
(part.untilSeqNo - part.fromSeqNo).toInt)
}
}