in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [131:151]
private def receiveOne(timeout: Duration, msg: String): Future[Iterable[EventData]] = {
def receiveOneWithRetry(timeout: Duration,
msg: String,
retryCount: Int): Future[Iterable[EventData]] = {
if (!receiver.getIsOpen && retryCount < RetryCount) {
val taskId = EventHubsUtils.getTaskId
logInfo(
s"(TID $taskId) receiver is not opened yet. Will retry {$retryCount} for namespaceUri: $namespaceUri " +
s"EventHubNameAndPartition: $nAndP consumer group: $consumerGroup")
val retry = retryCount + 1
after(WaitInterval.milliseconds)(receiveOneWithRetry(timeout, msg, retry))
} else {
receiver.setReceiveTimeout(timeout)
retryNotNull(receiver.receive(1), msg).map(
_.asScala
)
}
}
receiveOneWithRetry(timeout, msg, 0)
}