in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [709:792]
def verifyAsyncCommandCompletion(
client: Client,
database: String,
commandResult: KustoResultSetTable,
samplePeriod: FiniteDuration = KCONST.DefaultPeriodicSamplePeriod,
timeOut: FiniteDuration,
doingWhat: String,
loggerName: String,
requestId: String): Option[KustoResultSetTable] = {
commandResult.next()
val operationId = commandResult.getString(0)
val operationsShowCommand = CslCommandsGenerator.generateOperationsShowCommand(operationId)
val sampleInMillis = samplePeriod.toMillis.toInt
val timeoutInMillis = timeOut.toMillis
val delayPeriodBetweenCalls = if (sampleInMillis < 1) 1 else sampleInMillis
val stateCol = "State"
val statusCol = "Status"
val statusCheck: () => Option[KustoResultSetTable] = () => {
try {
Some(client.execute(database, operationsShowCommand).getPrimaryResults)
} catch {
case e: DataServiceException =>
if (e.isPermanent) {
val message =
s"Couldn't monitor the progress of the $doingWhat on requestId: $requestId operation from the service, you may track" +
s" it using the command '$operationsShowCommand'."
logError("verifyAsyncCommandCompletion", message)
throw new Exception(message, e)
}
logWarn(
"verifyAsyncCommandCompletion",
"Failed transiently to retrieve export status, trying again in a few seconds")
None
case _: DataClientException => None
}
}
var lastResponse: Option[KustoResultSetTable] = None
val task = doWhile[Option[KustoResultSetTable]](
func = statusCheck,
delayBeforeStart = 0,
delayBeforeEach = delayPeriodBetweenCalls,
doWhileCondition = (result: Option[KustoResultSetTable]) => {
val inProgress =
result.isEmpty || (result.get.next() && result.get.getString(stateCol) == "InProgress")
if (inProgress) {
logDebug(
loggerName,
s"Async operation $doingWhat on requestId $requestId, is in status 'InProgress'," +
"polling status again in a few seconds")
}
inProgress
},
finalWork = (result: Option[KustoResultSetTable]) => {
lastResponse = result
},
maxWaitTimeBetweenCallsMillis = ReadInitialMaxWaitTime.toMillis.toInt,
ReadMaxWaitTime.toMillis.toInt)
var success = true
if (timeOut < FiniteDuration.apply(0, SECONDS)) {
task.await()
} else {
if (!task.await(timeoutInMillis, TimeUnit.MILLISECONDS)) {
// Timed out
success = false
}
}
if (lastResponse.isEmpty || lastResponse.get.getString(stateCol) != "Completed") {
throw new FailedOperationException(
s"Failed to execute Kusto operation with OperationId '$operationId', State: '${lastResponse.get
.getString(stateCol)}'," +
s" Status: '${lastResponse.get.getString(statusCol)}'",
lastResponse)
}
if (!success) {
throw new TimeoutAwaitingPendingOperationException(
s"Timed out while waiting for operation with OperationId '$operationId'")
}
lastResponse
}