in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [245:316]
private def receive(requestSeqNo: SequenceNumber, batchSize: Int): Iterator[EventData] = {
val taskId = EventHubsUtils.getTaskId
val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
// first check if this call re-executes a stream that has already been received.
// This situation could happen if multiple actions or writers are using the
// same stream. In this case, we return the cached data.
if (cachedData.matchSeqNoAndBatchSize(requestSeqNo, batchSize)) {
logInfo(s"(TID $taskId) Returned data from cache for namespaceUri: $namespaceUri EventHubNameAndPartition: $nAndP " +
s"consumer group: $consumerGroup, requestSeqNo: $requestSeqNo, batchSize: $batchSize")
return cachedData.getCachedDataIterator()
}
// Retrieve the events. First, we get the first event in the batch.
// Then, if the succeeds, we collect the rest of the data.
val first = Await.result(checkCursor(requestSeqNo), ehConf.internalOperationTimeout)
val firstSeqNo = first.head.getSystemProperties.getSequenceNumber
val batchCount = (requestSeqNo + batchSize - firstSeqNo).toInt
if (batchCount <= 0) {
return Iterator.empty
}
val theRest = for { i <- 1 until batchCount } yield
awaitReceiveMessage(receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout),
s"receive; $nAndP; seqNo: ${requestSeqNo + i}"),
requestSeqNo)
// Combine and sort the data.
val combined = first ++ theRest.flatten
val sortedSeq = combined.toSeq
.sortWith((e1, e2) =>
e1.getSystemProperties.getSequenceNumber < e2.getSystemProperties.getSequenceNumber)
cachedData = new CachedReceivedData(requestSeqNo, batchSize, sortedSeq)
val sorted = sortedSeq.iterator
val (result, validate) = sorted.duplicate
val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
// if slowPartitionAdjustment is on, send the partition performance for this batch to the driver
if (ehConf.slowPartitionAdjustment) {
sendPartitionPerformanceToDriver(
PartitionPerformanceMetric(nAndP,
EventHubsUtils.getTaskContextSlim,
requestSeqNo,
batchCount,
elapsedTimeMs))
}
if (metricPlugin.isDefined) {
val (validateSize, batchSizeInBytes) =
validate
.map(eventData => (1, eventData.getBytes.length.toLong))
.reduceOption { (countAndSize1, countAndSize2) =>
(countAndSize1._1 + countAndSize2._1, countAndSize1._2 + countAndSize2._2)
}
.getOrElse((0, 0L))
metricPlugin.foreach(
_.onReceiveMetric(EventHubsUtils.getTaskContextSlim,
nAndP,
batchCount,
batchSizeInBytes,
elapsedTimeMs))
assert(validateSize == batchCount)
} else {
assert(validate.size == batchCount)
}
logInfo(s"(TID $taskId) Finished receiving for namespaceUri: $namespaceUri EventHubNameAndPartition: $nAndP " +
s"consumer group: $consumerGroup, batchSize: $batchSize, elapsed time: $elapsedTimeMs ms")
result
}