def moveExtentsWithRetries()

in connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala [253:371]


  def moveExtentsWithRetries(
      batchSize: Option[Int],
      totalAmount: Int,
      database: String,
      tmpTableName: String,
      targetTable: String,
      ingestionStartTime: Instant,
      crp: ClientRequestProperties,
      writeOptions: WriteOptions): Unit = {
    var extentsProcessed = 0
    var retry = 0
    var curBatchSize = batchSize.getOrElse(0)
    var delayPeriodBetweenCalls = DelayPeriodBetweenCalls
    var consecutiveSuccesses = 0
    val useMaterializedViewFlag = shouldUseMaterializedViewFlag(database, targetTable, crp)
    val firstMoveRetries = writeOptions.maxRetriesOnMoveExtents
    val secondMovesRetries = Math.max(10, writeOptions.maxRetriesOnMoveExtents)
    while (extentsProcessed < totalAmount) {
      var error: Object = null
      var res: Option[KustoResultSetTable] = None
      var failed = false
      // Execute move batch and keep any transient error for handling
      try {
        val timeRange = Array[Instant](ingestionStartTime, Instant.now())
        val operation = executeEngine(
          database,
          generateTableMoveExtentsAsyncCommand(
            tmpTableName,
            targetTable,
            timeRange,
            if (batchSize.isEmpty) None else Some(curBatchSize),
            useMaterializedViewFlag),
          "extentsMove",
          crp).getPrimaryResults
        val operationResult = KDSU.verifyAsyncCommandCompletion(
          engineClient,
          database,
          operation,
          samplePeriod = KustoConstants.DefaultPeriodicSamplePeriod,
          writeOptions.timeout,
          s"move extents to destination table '$targetTable' ",
          myName,
          writeOptions.requestId)
        // TODO: use count over the show operations
        res = Some(
          executeEngine(
            database,
            generateShowOperationDetails(operationResult.get.getString(0)),
            "operationsDetailsShow",
            crp).getPrimaryResults)
        if (res.get.count() == 0) {
          failed = handleNoResults(totalAmount, extentsProcessed, database, tmpTableName, crp)
          if (!failed) {
            // No more extents to move - succeeded
            extentsProcessed = totalAmount
          }
        }
      } catch {
        // We don't check for the shouldRetry or permanent errors because we know
        // The issue is not with syntax or non-existing tables, it can only be transient
        // issues that might be solved in retries even if engine reports them as permanent
        case ex: FailedOperationException =>
          if (ex.getResult.isDefined) {
            error = ex.getResult.get.getString("Status")
          }
          failed = true
        case ex: KustoDataExceptionBase =>
          error = ExceptionUtils.getStackTrace(ex)
          failed = true
      }

      // When some node fails the move - it will put "failed" as the target extent id
      if (res.isDefined && error == null) {
        val errorInResult = findErrorInResult(res.get)
        failed = errorInResult._1
        error = errorInResult._2
      }

      if (failed) {
        consecutiveSuccesses = 0
        retry += 1
        val extentsProcessedErrorString =
          if (extentsProcessed > 0) s"and ${extentsProcessed} were moved" else ""
        if (extentsProcessed > 0) {
          // This is not the first move command
          if (retry > secondMovesRetries)
            throw RetriesExhaustedException(
              s"Failed to move extents after $retry tries$extentsProcessedErrorString.")
        } else if (retry > firstMoveRetries)
          throw RetriesExhaustedException(
            s"Failed to move extents after $retry tries$extentsProcessedErrorString.")

        // Lower batch size, increase delay
        val params =
          handleRetryFail(curBatchSize, retry, delayPeriodBetweenCalls, targetTable, error)

        curBatchSize = params._1
        delayPeriodBetweenCalls = params._2
      } else {
        consecutiveSuccesses += 1
        if (consecutiveSuccesses > 2) {
          // After curBatchSize size has decreased - we can lower it again according to original batch size
          curBatchSize = Math.min(curBatchSize * 2, batchSize.getOrElse(curBatchSize * 2))
        }

        extentsProcessed += res.get.count()
        val batchSizeString = if (batchSize.isDefined) s"maxBatch: $curBatchSize," else ""
        KDSU.logDebug(
          myName,
          s"Moving extents batch succeeded at retry: $retry," +
            s" $batchSizeString consecutive successfull batches: $consecutiveSuccesses, successes this " +
            s"batch: ${res.get.count()}," +
            s" extentsProcessed: $extentsProcessed, backoff: $delayPeriodBetweenCalls, total:$totalAmount")

        retry = 0
        delayPeriodBetweenCalls = DelayPeriodBetweenCalls
      }
    }
  }