def verifyAsyncCommandCompletion()

in connector/src/main/scala/com/microsoft/kusto/spark/utils/KustoDataSourceUtils.scala [709:792]


  def verifyAsyncCommandCompletion(
      client: Client,
      database: String,
      commandResult: KustoResultSetTable,
      samplePeriod: FiniteDuration = KCONST.DefaultPeriodicSamplePeriod,
      timeOut: FiniteDuration,
      doingWhat: String,
      loggerName: String,
      requestId: String): Option[KustoResultSetTable] = {
    commandResult.next()
    val operationId = commandResult.getString(0)
    val operationsShowCommand = CslCommandsGenerator.generateOperationsShowCommand(operationId)
    val sampleInMillis = samplePeriod.toMillis.toInt
    val timeoutInMillis = timeOut.toMillis
    val delayPeriodBetweenCalls = if (sampleInMillis < 1) 1 else sampleInMillis

    val stateCol = "State"
    val statusCol = "Status"
    val statusCheck: () => Option[KustoResultSetTable] = () => {
      try {
        Some(client.execute(database, operationsShowCommand).getPrimaryResults)
      } catch {
        case e: DataServiceException =>
          if (e.isPermanent) {
            val message =
              s"Couldn't monitor the progress of the $doingWhat on requestId: $requestId operation from the service, you may track" +
                s" it using the command '$operationsShowCommand'."
            logError("verifyAsyncCommandCompletion", message)
            throw new Exception(message, e)
          }
          logWarn(
            "verifyAsyncCommandCompletion",
            "Failed transiently to retrieve export status, trying again in a few seconds")
          None
        case _: DataClientException => None
      }
    }
    var lastResponse: Option[KustoResultSetTable] = None
    val task = doWhile[Option[KustoResultSetTable]](
      func = statusCheck,
      delayBeforeStart = 0,
      delayBeforeEach = delayPeriodBetweenCalls,
      doWhileCondition = (result: Option[KustoResultSetTable]) => {
        val inProgress =
          result.isEmpty || (result.get.next() && result.get.getString(stateCol) == "InProgress")
        if (inProgress) {
          logDebug(
            loggerName,
            s"Async operation $doingWhat on requestId $requestId, is in status 'InProgress'," +
              "polling status again in a few seconds")
        }
        inProgress
      },
      finalWork = (result: Option[KustoResultSetTable]) => {
        lastResponse = result
      },
      maxWaitTimeBetweenCallsMillis = ReadInitialMaxWaitTime.toMillis.toInt,
      ReadMaxWaitTime.toMillis.toInt)

    var success = true
    if (timeOut < FiniteDuration.apply(0, SECONDS)) {
      task.await()
    } else {
      if (!task.await(timeoutInMillis, TimeUnit.MILLISECONDS)) {
        // Timed out
        success = false
      }
    }

    if (lastResponse.isEmpty || lastResponse.get.getString(stateCol) != "Completed") {
      throw new FailedOperationException(
        s"Failed to execute Kusto operation with OperationId '$operationId', State: '${lastResponse.get
            .getString(stateCol)}'," +
          s" Status: '${lastResponse.get.getString(statusCol)}'",
        lastResponse)
    }

    if (!success) {
      throw new TimeoutAwaitingPendingOperationException(
        s"Timed out while waiting for operation with OperationId '$operationId'")
    }

    lastResponse
  }