in connector/src/main/scala/com/microsoft/kusto/spark/utils/ExtendedKustoClient.scala [215:251]
def moveExtents(
database: String,
tmpTableName: String,
targetTable: String,
crp: ClientRequestProperties,
writeOptions: WriteOptions,
sinkStartTime: Instant): Unit = {
val extentsCountQuery =
executeEngine(database, generateExtentsCountCommand(tmpTableName), "countExtents", crp).getPrimaryResults
extentsCountQuery.next()
val extentsCount = extentsCountQuery.getInt(0)
if (extentsCount > writeOptions.minimalExtentsCountForSplitMerge) {
val nodeCountQuery =
executeEngine(database, generateNodesCountCommand(),"nodesCount", crp).getPrimaryResults
nodeCountQuery.next()
val nodeCount = nodeCountQuery.getInt(0)
moveExtentsWithRetries(
Some(nodeCount * writeOptions.minimalExtentsCountForSplitMerge),
extentsCount,
database,
tmpTableName,
targetTable,
sinkStartTime,
crp,
writeOptions)
} else {
moveExtentsWithRetries(
None,
extentsCount,
database,
tmpTableName,
targetTable,
sinkStartTime,
crp,
writeOptions)
}
}