in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java [2296:2512]
void detectFragmentation(HashMap<IRmJob, Integer> needed_by_job)
{
String methodName = "detectFragmentation";
if ( logger.isDebug() ) {
logger.debug(methodName, null, "vMachines:", fmtArray(globalNodepool.cloneVMachinesByOrder()));
}
List<NodePool> poollist = new ArrayList<NodePool>();
getNodepools(globalNodepool, poollist);
NodePool[] allPools = poollist.toArray(new NodePool[poollist.size()]);
if ( logger.isDebug() ) {
// make sure the node pool list is built correctly
StringBuffer sb = new StringBuffer("Nodepools:");
for ( NodePool np : allPools ) {
sb.append(np.getId());
sb.append(" ");
}
logger.debug(methodName, null, sb.toString());
}
// These next two maps are built lazily, once per call to this routine
Map<String, int[]> vshares = new HashMap<String, int[]>(); // Virtual shares of each order in each nodepool.
// This gets enhanced with pending evictions so we
// can tell whether enough space is accounted for
// for each job.
Map<String, int[]> nshares = new HashMap<String, int[]>(); // For each order, the number of shares available,
// either directly, or through splits from higher-order
// space, enhanced with pending evictions and purges.
Map<String, Map<IRmJob, Integer>> jobs = new HashMap<String, Map<IRmJob, Integer>>();
for ( int npi = 0; npi < allPools.length; npi++ ) { // Turns out to be depth-first traversal !
// First pass, init the structures, including any free space that may have been unusable
NodePool np = allPools[npi];
String id = np.getId();
int[] vmach =globalNodepool.makeArray();
int[] nmach =globalNodepool.makeArray();
Map<IRmJob, Integer> jobmap = new HashMap<IRmJob, Integer>();
vshares.put(id, vmach);
nshares.put(id, nmach);
jobs.put(id, jobmap);
}
boolean must_defrag = false;
String headerfmt = "%14s %20s %6s %4s %7s %6s %2s";
String datafmt = "%14s %20s %6d %4d %7d %6d %2d";
for ( ResourceClass rc : resourceClasses.values() ) {
// Next: Look at every job and work out its "need". Collect jobs by nodepool into the jobmaps.
Map<IRmJob, IRmJob> allJobs = rc.getAllJobs();
String npn = rc.getNodepoolName();
Map<IRmJob, Integer> jobmap = jobs.get(npn);
if ( allJobs.size() == 0 ) continue;
logger.info(methodName, null, String.format(headerfmt, "Nodepool", "User", "PureFS", "NSh", "Counted", "Needed", "O"), "Class:", rc.getName());
for ( IRmJob j : allJobs.values() ) {
if ( j.isRefused() ) {
continue;
}
if ( j.isDeferred() ) {
continue;
}
int counted = j.countNSharesGiven(); // accounting for ramp-up, various caps, etc.
int current = j.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption
int needed = counted - current; // could go negative if its evicting
int order = j.getShareOrder();
if ( j.getSchedulingPolicy() == Policy.FAIR_SHARE ) { // cap on frag threshold
if ( current >= fragmentationThreshold ) {
needed = 0;
} else if ( current >= j.getPureFairShare() ) { // more than our pure share, we're not needy
needed = 0;
} else if ( needed < 0 ) { // more than out count, likely are evicting
needed = 0;
} else if ( needed > 0) {
needed = Math.min(needed, fragmentationThreshold);
jobmap.put(j, needed); // we'll log this in a minute, not here
must_defrag = true;
}
} else { // if not fair-share, must always try to defrag if needed
// its full allocation, and therefore cannot be needy
// UIMA-4275 We rely on quotas to keep 'needed' under control
if ( needed > 0 ) {
jobmap.put(j, needed);
must_defrag = true;
}
}
logger.info(methodName, j.getId(),
String.format(datafmt,
npn,
j.getUser().getName(),
j.getPureFairShare(),
current,
counted,
needed,
order),
(needed > 0) ? "POTENTIALLY NEEDY" : ""
);
// String.format("NP: %10s User %8s Pure fs[%3d] Nshares[%3d] AsgnNshares[%3d] needed[%3d] O[%d] %s",
// npn,
// j.getUser().getName(),
// j.getPureFairShare(),
// current,
// counted,
// needed,
// order,
// (needed > 0) ? "POTENTIALLY NEEDY" : ""
// ));
}
}
if ( ! must_defrag ) return; // the rest of this is expensive, let's bypass if we can
for ( int npi = 0; npi < allPools.length; npi++ ) { // Turns out to be depth-first traversal !
// Next pass, find all the open and potentially open spots in each nodepool. We need to coalesce
// evicted shares with each other and with open space on each machine in order to know whether the
// space is potentially usable by jobs.
NodePool np = allPools[npi];
Map<Node, Machine> machs = np.getAllMachinesForPool();
for ( Machine m : machs.values() ) {
int free = m.countFreedUpShares(); // free space plus evicted shares - eventual space
if ( free != 0 ) {
logger.trace(methodName, null, "Freed shares", free, "on machine", m.getId(), "in nodepool", np.getId());
for ( NodePool npj = np; npj != null; npj = npj.getParent() ) { // must propogate up because of how these tables work
String id_j = npj.getId();
int[] vmach_j = vshares.get(id_j);
if (free >= vmach_j.length) { // UIMA-5605 Avoid out-of-bounds exception
logger.warn(methodName, null, "Nodepool",id_j,"has a maximum size of", vmach_j.length-1,
"but the computed free space is", free);
free = vmach_j.length - 1;
}
logger.trace(methodName, null, "Update v before: NP[", id_j, "] v:", fmtArray(vmach_j));
vmach_j[free]++; // This is the largest potential share that can be made on this machine,
// after evictions starting 'here' and propogating up
logger.trace(methodName, null, "Update v after : NP[", id_j, "] v:", fmtArray(vmach_j));
}
}
}
}
for ( int npi = 0; npi < allPools.length; npi++ ) {
// Next pass, create the cumulative "n" style table from the "v" table of holes
String id = allPools[npi].getId();
int[] vmach = vshares.get(id);
int[] nmach = nshares.get(id);
reworknShares(vmach, nmach); // Populate nmach from vmach for this np, with free or potentially free shares
if ( logger.isInfo() ) {
logger.info(methodName, null, "NP", id, "After check: virtual free Space", fmtArray(vmach));
logger.info(methodName, null, "NP", id, "After check: cumulative free Space", fmtArray(nmach));
}
}
for ( int npi = 0; npi < allPools.length; npi++ ) {
// Last step, traverse jobs by nodepool and determine their need.
String id = allPools[npi].getId();
Map<IRmJob, Integer> jobmap = jobs.get(id);
if ( jobmap.size() == 0 ) continue; // only looking at potentially needy jobs
int[] nmach = nshares.get(id);
for ( IRmJob j : jobmap.keySet() ) {
int needed = jobmap.get(j);
int order = j.getShareOrder();
int available = nmach[order];
int to_remove = 0;
needyJobs.put(j, j);
if ( available >= needed ) {
needed = 0;
to_remove = needed;
} else {
to_remove = available;
needed -= to_remove;
}
if ( to_remove > 0 ) {
NodePool np = allPools[npi];
for ( NodePool npj = np; npj != null; npj = npj.getParent() ) { // must propogate up because of how these tables work
String id_j = npj.getId();
int[] vmach_j = vshares.get(id_j);
int[] nmach_j = nshares.get(id_j);
removeSharesByOrder(vmach_j, nmach_j, to_remove, order);
}
}
if ( needed > 0 ) {
needed_by_job.put(j, needed);
logger.info(methodName, j.getId(),
String.format("NP: %10s User: %10s Pure fs: %3d needed %3d O[%d] %s",
id,
j.getUser().getName(),
j.getPureFairShare(),
needed,
order,
"ACTUALLY NEEDY"
));
}
}
}
}