in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java [1924:2101]
int takeFromTheRich(IRmJob nj,
int needed,
TreeMap<User, User> users_by_wealth,
HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user)
{
String methodName = "takeFromTheRich";
// 1. Collect all machines that have shares, which if evicted, would make enough space
// - in compatible NP
// - g + sum(shares belonging to rich users on the machine);
// 2. Order the machiens by
// a) richest user
// b) largest machine
// 3. Pick next machine,
// - clear enough shares
// - remove machine from list
// - update wealth
// 4. Repeat at 2 until
// a) have given what is needed
// b) nothing left to give
Map<IRmJob, IRmJob> candidateJobs = new HashMap<IRmJob, IRmJob>();
Map<Machine, Machine> eligibleMachines = new TreeMap<Machine, Machine>(new EligibleMachineSorter());
for ( TreeMap<IRmJob, IRmJob> jobs : jobs_by_user.values() ) {
candidateJobs.putAll(jobs);
}
int given = 0;
int orderNeeded = nj.getShareOrder();
ResourceClass cl = nj.getResourceClass(); // needy job's resource class
String npname = cl.getNodepoolName(); // name of the class
NodePool np = globalNodepool.getSubpool(npname); // job's nodepool
Map<Node, Machine> machines = np.getAllMachines(); // everything here is a candidate, nothing else is
// this is the machines in the pool, and all the
// subpools
// Here we filter all the machines looking for machines that *might* be able to satisfy the defrag. At the
// end this set of machines is eligbleMachines.
machine_loop :
for ( Machine m : machines.values() ) {
if ( m.getShareOrder() < orderNeeded ) { // nope, too small
logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded);
continue;
}
// if the job is a reservation the machine size has to matchm and machine must be clearable
if ( nj.getSchedulingPolicy() == Policy.RESERVE ) {
if ( m.getShareOrder() != orderNeeded ) {
logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": RESERVE policy requires exact match for order", orderNeeded);
continue;
}
// machine must be clearable as well
Collection<Share> shares = m.getActiveShares().values();
for ( Share s : shares ) {
if ( ! candidateJobs.containsKey(s.getJob()) ) {
logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": for reservation, machine contains non-candidate job", s.getJob().getId());
continue machine_loop;
}
}
}
Map<Share, Share> as = m.getActiveShares(); // everything alloacated here
int g = m.getVirtualShareOrder(); // g is space that we might be able to make after defrag:
// free space + freeable-from-candidates
for ( Share s : as.values() ) {
IRmJob j = s.getJob();
if ( s.isForceable() && candidateJobs.containsKey(j) ) { // evictable, and a candidate for reclamation by defrag
g += j.getShareOrder();
}
}
if ( g >= orderNeeded ) { // if it's usable by the job, it's a candidate
logger.info(methodName, nj.getId(), "Candidate machine:", m.getId());
eligibleMachines.put(m, m);
} else {
logger.info(methodName, nj.getId(), "Not a candidate, insufficient free space + candidate shares:", m.getId());
}
}
// Now eligibleMachines is the set of candidate machines for defrag
logger.info(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:");
StringBuffer buf = new StringBuffer();
for ( Machine m : eligibleMachines.keySet() ) {
buf.append(m.getId());
buf.append(" ");
}
logger.info(methodName, nj.getId(), "Eligible machines:", buf.toString());
// first part done, we know where to look.
// Now just bop through the machines to see if we can get anything for this specific job (nj)
int given_per_round = 0;
do {
int g = 0;
given_per_round = 0;
for ( Machine m : eligibleMachines.keySet() ) {
//
// How best to order candidate shares? You can choose the "wealthiest" first, but if it's not a good
// match by size, end up evicting too many shares which could include a not-so-wealthy share, or
// increase frag by breaking it up and leaving a useless bit.
//
// So we're going to try ordering shares by "wealthiest", but then if we find an exact match by size,
// order that to the front of the candidates. We may not end up evicting the "wealthiest", but we
// should end up evicting tne least disruptive share.
//
List<Share> sh = new ArrayList<Share>();
sh.addAll(m.getActiveShares().values());
Collections.sort(sh, new ShareByWealthSorter());
g = m.getVirtualShareOrder(); // ( free space at this point )
List<Share> potentialShares = new ArrayList<Share>();
for ( Share s : sh ) {
IRmJob j = s.getJob();
// User u = j.getUser();
if ( s.isForceable() ) {
if ( candidateJobs.containsKey(j) ) {
g += s.getShareOrder();
if ( s.getShareOrder() == orderNeeded ) {
potentialShares.add(0, s); // exact matches first
} else {
potentialShares.add(s);
}
}
}
if ( g >= orderNeeded ) break;
}
// potentialShares should be properly ordered as discussed above at this point
if ( g >= orderNeeded ) {
// found enough on this machine for 1 share!
logger.debug(methodName, nj.getId(), "Clearing shares: g[", g, "], orderNeeded[", orderNeeded, "]");
g = m.getVirtualShareOrder(); // reset
for ( Share s : potentialShares ) {
IRmJob j = s.getJob();
User u = j.getUser();
g += s.getShareOrder();
given_per_round++;
clearShare(s, nj);
u.subtractWealth(s.getShareOrder());
logger.debug(methodName, nj.getId(), "Clearing share", s, "order[", s.getShareOrder(),
"]: g[", g, "], orderNeeded[", orderNeeded, "]");
if ( g >= orderNeeded) break; // inner loop, could break on exact match without giving everything away
}
break; // outer loop, if anything was found
}
}
if ( given_per_round > 0 ) {
// Must reorder the eligible list to get the "next" best candidate. We could try to remove
// machines that were exhausted above ...
Map<Machine, Machine> tmp = new HashMap<Machine, Machine>();
tmp.putAll(eligibleMachines);
eligibleMachines.clear();
for ( Machine m : tmp.keySet() ) {
eligibleMachines.put(m, m);
}
// and also must track how many processes we made space for
given = given + (g / orderNeeded); // at least one,or else we have a bug
logger.debug(methodName, nj.getId(), "LOOPEND: given[", given, "] g[", g, "] orderNeeded[", orderNeeded, "]");
}
logger.debug(methodName, nj.getId(), "Given_per_round", given_per_round, "given", given, "needed", needed);
} while ( (given_per_round > 0) && ( given < needed ));
// Sometimes we can directly reassign a share, in which case the job isn't waiting any more.
// We only care about setting a message if the poor thing is still totally starved of resources.
if ( nj.countNShares() == 0 ) {
nj.setReason("Waiting for defragmentation.");
}
return given;
}