in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java [724:843]
public int shrinkByInvestment(int shares, int order, boolean force, NodePool nodepool)
{
String methodName = "shrinkByInvestment";
if ( shares <= 0 ) {
throw new SchedulingException(getId(), "Trying to shrink by " + shares + " shares.");
}
logger.debug(methodName, getId(), "Enter: shares", shares, "order", order, "force", force, "nodepool", nodepool.getId(),
"nAssignedShares", assignedShares.size(), "nPendingShares", pendingShares.size());
ArrayList<Share> sharesSorted = new ArrayList<Share>();
// must pick up only shares in the given nodepool
for ( Share s : assignedShares.values() ) {
if ( s.getNodepoolId().equals(nodepool.getId()) && ( !s.isEvicted() ) ) {
sharesSorted.add(s);
} else {
if ( logger.isTrace () ) {
logger.trace(methodName, getId(), "Skipping", s.getId(), "s.nodepool", s.getNodepoolId(), "incoming.nodepool", nodepool.getId(), "evicted", s.isEvicted());
}
}
}
if ( sharesSorted.size() == 0 ) {
return 0;
}
if ( logger.isTrace() ) {
logger.trace(methodName, getId(), "Shares Before Sort - id, isInitialized, investment:");
for ( Share s : sharesSorted ) {
logger.trace(methodName, getId(), s.getId(), s.isInitialized(), s.getInvestment());
}
}
Collections.sort(sharesSorted, new ShareByInvestmentSorter());
if ( logger.isTrace() ) {
logger.trace(methodName, getId(), "Shares After Sort - id, isInitialized, investment:");
for ( Share s : sharesSorted ) {
logger.trace(methodName, getId(), s.getId(), s.isInitialized(), s.getInvestment());
}
}
//
// How much to lose? If we're not forcing, then only shares that are evicted because of
// the 'howMuch' counts. If forcing then everything until we meet the goal or we run
// out of stuff to give.
//
int shares_given = 0; // number of shares of requested order given - NOT necessarily number of my own processes
int processes_to_lose = 0; // number of processes I'm able to lose
int processes_given = 0;
if ( force ) {
processes_to_lose = countNShares();
} else {
processes_to_lose = Math.max(0, countNShares() - countNSharesGiven());
}
processes_to_lose = Math.min(processes_to_lose, sharesSorted.size());
if ( processes_to_lose == 0 ) {
return 0;
}
while ( (shares_given < shares) && (processes_given < processes_to_lose) ) {
int currently_given = 0;
if ( logger.isTrace() ) {
logger.trace(methodName, getId(), "In loop: Shares given", shares_given, "shares wanted", shares,
"processes_to_lose", processes_to_lose, "processes_given", processes_given);
}
Share s = sharesSorted.get(0);
Machine m = s.getMachine();
int to_give = m.countFreedUpShares();
logger.debug(methodName, getId(), "Inspecting share", s.getId());
ArrayList<Share> slist = new ArrayList<Share>();
Iterator<Share> iter = sharesSorted.iterator();
while ( (to_give < order) && iter.hasNext() ) { // if we need more shares from this machine to be useful ...
// Here we search the share list for enough more shares on the machine to make up enough shares
// to satisy exactly one of the requested sizes.
Share ss = iter.next();
if ( ss.getMachine() == s.getMachine() ) {
slist.add(ss);
to_give += ss.getShareOrder();
}
}
if ( to_give >= order ) { // did we find enough on the machine to make it useful to evict?
//slist.add(s); // didn't put on the list earlier, in case we couldn't use it
for ( Share ss : slist ) {
logger.info(methodName, getId(), "Removing share", ss.toString());
pendingRemoves.put(ss, ss);
ss.evict();
sharesSorted.remove(ss);
processes_given++;
currently_given++;
if ( processes_given >= processes_to_lose ) break; // if this is too few to be useful, defrag will fix it (mostly)
}
shares_given += (to_give / order);
}
//
// If we gave nothing away we didn't change any of the structures and we'll
// never exit. So exit stage left asap right now.
// We rarely if ever will enter this but it prevents an infinite loop in
// varioius corner cases.
//
if ( currently_given == 0 ) {
logger.debug(methodName, getId(), "Gave no shares, breaking loop");
break;
}
}
return shares_given;
}