private def receiveOne()

in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [131:151]


  private def receiveOne(timeout: Duration, msg: String): Future[Iterable[EventData]] = {
    def receiveOneWithRetry(timeout: Duration,
                            msg: String,
                            retryCount: Int): Future[Iterable[EventData]] = {
      if (!receiver.getIsOpen && retryCount < RetryCount) {
        val taskId = EventHubsUtils.getTaskId
        logInfo(
          s"(TID $taskId) receiver is not opened yet. Will retry {$retryCount} for namespaceUri: $namespaceUri " +
            s"EventHubNameAndPartition: $nAndP consumer group: $consumerGroup")

        val retry = retryCount + 1
        after(WaitInterval.milliseconds)(receiveOneWithRetry(timeout, msg, retry))
      } else {
        receiver.setReceiveTimeout(timeout)
        retryNotNull(receiver.receive(1), msg).map(
          _.asScala
        )
      }
    }
    receiveOneWithRetry(timeout, msg, 0)
  }