private def receive()

in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [245:316]


  private def receive(requestSeqNo: SequenceNumber, batchSize: Int): Iterator[EventData] = {
    val taskId = EventHubsUtils.getTaskId
    val startTimeNs = System.nanoTime()
    def elapsedTimeNs = System.nanoTime() - startTimeNs

    // first check if this call re-executes a stream that has already been received.
    // This situation could happen if multiple actions or writers are using the
    // same stream. In this case, we return the cached data.
    if (cachedData.matchSeqNoAndBatchSize(requestSeqNo, batchSize)) {
      logInfo(s"(TID $taskId) Returned data from cache for namespaceUri: $namespaceUri EventHubNameAndPartition: $nAndP " +
        s"consumer group: $consumerGroup, requestSeqNo: $requestSeqNo, batchSize: $batchSize")
      return cachedData.getCachedDataIterator()
    }

    // Retrieve the events. First, we get the first event in the batch.
    // Then, if the succeeds, we collect the rest of the data.
    val first = Await.result(checkCursor(requestSeqNo), ehConf.internalOperationTimeout)
    val firstSeqNo = first.head.getSystemProperties.getSequenceNumber
    val batchCount = (requestSeqNo + batchSize - firstSeqNo).toInt

    if (batchCount <= 0) {
      return Iterator.empty
    }

    val theRest = for { i <- 1 until batchCount } yield
      awaitReceiveMessage(receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout),
                                     s"receive; $nAndP; seqNo: ${requestSeqNo + i}"),
                          requestSeqNo)
    // Combine and sort the data.
    val combined = first ++ theRest.flatten
    val sortedSeq = combined.toSeq
      .sortWith((e1, e2) =>
        e1.getSystemProperties.getSequenceNumber < e2.getSystemProperties.getSequenceNumber)

    cachedData = new CachedReceivedData(requestSeqNo, batchSize, sortedSeq)

    val sorted = sortedSeq.iterator
    val (result, validate) = sorted.duplicate
    val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)

    // if slowPartitionAdjustment is on, send the partition performance for this batch to the driver
    if (ehConf.slowPartitionAdjustment) {
      sendPartitionPerformanceToDriver(
        PartitionPerformanceMetric(nAndP,
                                   EventHubsUtils.getTaskContextSlim,
                                   requestSeqNo,
                                   batchCount,
                                   elapsedTimeMs))
    }

    if (metricPlugin.isDefined) {
      val (validateSize, batchSizeInBytes) =
        validate
          .map(eventData => (1, eventData.getBytes.length.toLong))
          .reduceOption { (countAndSize1, countAndSize2) =>
            (countAndSize1._1 + countAndSize2._1, countAndSize1._2 + countAndSize2._2)
          }
          .getOrElse((0, 0L))
      metricPlugin.foreach(
        _.onReceiveMetric(EventHubsUtils.getTaskContextSlim,
                          nAndP,
                          batchCount,
                          batchSizeInBytes,
                          elapsedTimeMs))
      assert(validateSize == batchCount)
    } else {
      assert(validate.size == batchCount)
    }
    logInfo(s"(TID $taskId) Finished receiving for namespaceUri: $namespaceUri EventHubNameAndPartition: $nAndP " +
      s"consumer group: $consumerGroup, batchSize: $batchSize, elapsed time: $elapsedTimeMs ms")
    result
  }