in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [660:706]
def doWhile[A](
func: () => A,
delayBeforeStart: Long,
delayBeforeEach: Int,
doWhileCondition: A => Boolean,
finalWork: A => Unit,
maxWaitTimeBetweenCallsMillis: Int,
maxWaitTimeAfterMinute: Int): CountDownLatch = {
val latch = new CountDownLatch(1)
val t = new Timer()
var currentWaitTime = delayBeforeEach
var waitedTime = 0
var maxWaitTime = maxWaitTimeBetweenCallsMillis
class ExponentialBackoffTask extends TimerTask {
def run(): Unit = {
try {
val res = func.apply()
if (!doWhileCondition.apply(res)) {
finalWork.apply(res)
while (latch.getCount > 0) latch.countDown()
t.cancel()
} else {
waitedTime += currentWaitTime
if (waitedTime > TimeUnit.MINUTES.toMillis(1)) {
maxWaitTime = maxWaitTimeAfterMinute
}
currentWaitTime =
if (currentWaitTime + currentWaitTime > maxWaitTime) maxWaitTime
else currentWaitTime + currentWaitTime
t.schedule(new ExponentialBackoffTask(), currentWaitTime)
}
} catch {
case exception: Exception =>
while (latch.getCount > 0) latch.countDown()
t.cancel()
throw exception
}
}
}
val task: TimerTask = new ExponentialBackoffTask()
t.schedule(task, delayBeforeStart)
latch
}