in connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala [253:371]
def moveExtentsWithRetries(
batchSize: Option[Int],
totalAmount: Int,
database: String,
tmpTableName: String,
targetTable: String,
ingestionStartTime: Instant,
crp: ClientRequestProperties,
writeOptions: WriteOptions): Unit = {
var extentsProcessed = 0
var retry = 0
var curBatchSize = batchSize.getOrElse(0)
var delayPeriodBetweenCalls = DelayPeriodBetweenCalls
var consecutiveSuccesses = 0
val useMaterializedViewFlag = shouldUseMaterializedViewFlag(database, targetTable, crp)
val firstMoveRetries = writeOptions.maxRetriesOnMoveExtents
val secondMovesRetries = Math.max(10, writeOptions.maxRetriesOnMoveExtents)
while (extentsProcessed < totalAmount) {
var error: Object = null
var res: Option[KustoResultSetTable] = None
var failed = false
// Execute move batch and keep any transient error for handling
try {
val timeRange = Array[Instant](ingestionStartTime, Instant.now())
val operation = executeEngine(
database,
generateTableMoveExtentsAsyncCommand(
tmpTableName,
targetTable,
timeRange,
if (batchSize.isEmpty) None else Some(curBatchSize),
useMaterializedViewFlag),
"extentsMove",
crp).getPrimaryResults
val operationResult = KDSU.verifyAsyncCommandCompletion(
engineClient,
database,
operation,
samplePeriod = KustoConstants.DefaultPeriodicSamplePeriod,
writeOptions.timeout,
s"move extents to destination table '$targetTable' ",
myName,
writeOptions.requestId)
// TODO: use count over the show operations
res = Some(
executeEngine(
database,
generateShowOperationDetails(operationResult.get.getString(0)),
"operationsDetailsShow",
crp).getPrimaryResults)
if (res.get.count() == 0) {
failed = handleNoResults(totalAmount, extentsProcessed, database, tmpTableName, crp)
if (!failed) {
// No more extents to move - succeeded
extentsProcessed = totalAmount
}
}
} catch {
// We don't check for the shouldRetry or permanent errors because we know
// The issue is not with syntax or non-existing tables, it can only be transient
// issues that might be solved in retries even if engine reports them as permanent
case ex: FailedOperationException =>
if (ex.getResult.isDefined) {
error = ex.getResult.get.getString("Status")
}
failed = true
case ex: KustoDataExceptionBase =>
error = ExceptionUtils.getStackTrace(ex)
failed = true
}
// When some node fails the move - it will put "failed" as the target extent id
if (res.isDefined && error == null) {
val errorInResult = findErrorInResult(res.get)
failed = errorInResult._1
error = errorInResult._2
}
if (failed) {
consecutiveSuccesses = 0
retry += 1
val extentsProcessedErrorString =
if (extentsProcessed > 0) s"and ${extentsProcessed} were moved" else ""
if (extentsProcessed > 0) {
// This is not the first move command
if (retry > secondMovesRetries)
throw RetriesExhaustedException(
s"Failed to move extents after $retry tries$extentsProcessedErrorString.")
} else if (retry > firstMoveRetries)
throw RetriesExhaustedException(
s"Failed to move extents after $retry tries$extentsProcessedErrorString.")
// Lower batch size, increase delay
val params =
handleRetryFail(curBatchSize, retry, delayPeriodBetweenCalls, targetTable, error)
curBatchSize = params._1
delayPeriodBetweenCalls = params._2
} else {
consecutiveSuccesses += 1
if (consecutiveSuccesses > 2) {
// After curBatchSize size has decreased - we can lower it again according to original batch size
curBatchSize = Math.min(curBatchSize * 2, batchSize.getOrElse(curBatchSize * 2))
}
extentsProcessed += res.get.count()
val batchSizeString = if (batchSize.isDefined) s"maxBatch: $curBatchSize," else ""
KDSU.logDebug(
myName,
s"Moving extents batch succeeded at retry: $retry," +
s" $batchSizeString consecutive successfull batches: $consecutiveSuccesses, successes this " +
s"batch: ${res.get.count()}," +
s" extentsProcessed: $extentsProcessed, backoff: $delayPeriodBetweenCalls, total:$totalAmount")
retry = 0
delayPeriodBetweenCalls = DelayPeriodBetweenCalls
}
}
}