in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java [387:676]
protected void apportion_qshares(List<IEntity> entities, int[] vshares, String descr)
{
String methodName = "apportion_qshares";
boolean shares_given = false;
int maxorder = globalNodepool.getMaxOrder();
int[] nshares = globalNodepool.makeArray(); // nshares
if ( entities.size() == 0 ) return;
Collections.sort(entities, entities.get(0).getApportionmentSorter());
reworknShares(vshares, nshares);
ArrayList<IEntity> working = new ArrayList<IEntity>();
working.addAll(entities);
HashMap<IEntity, int[]> given_by_order = new HashMap<IEntity, int[]>();
HashMap<IEntity, Integer> deserved = new HashMap<IEntity, Integer>(); // qshares
// This section dealing with RmCounter writes the counting parameters to the log in a form
// that can be cut/pasted into a java properties file. This file can then be used in the
// RmCounter test/deveopment application. It also turns out to be very useful in the log
// in general so it is promoted to 'info' level.
StringBuffer enames = null;
StringBuffer eweights = null;
logger.info(methodName, null, descr, "RmCounter Start");
logger.info(methodName, null, descr, "maxorder = ", globalNodepool.getMaxOrder());
enames = new StringBuffer();
eweights = new StringBuffer();
for ( IEntity e : working ) {
int[] gbo = globalNodepool.makeArray();
e.setGivenByOrder(gbo);
given_by_order.put(e, gbo);
deserved.put(e, 0);
// jrc e.initWantedByOrder();
enames.append(e.getName());
enames.append(" ");
eweights.append(Integer.toString(e.getShareWeight()));
eweights.append(" ");
}
logger.info(methodName, null, descr, "entity_names = ", enames.toString());
logger.info(methodName, null, descr, "weights = ", eweights.toString());
for ( IEntity e : working ) {
logger.info(methodName, null, descr, "wantedby." + e.getName() + " = ", fmtArray(e.getWantedByOrder()));
}
logger.info(methodName, null, descr, "vmachines =", fmtArray(vshares));
logger.info(methodName, null, descr, "RmCounter End");
int pass = 0;
do {
// Starting at highest order, give full fair share to any entity that wants it, minus the
// shares already given. Remove the newly given shares and trickle down the fragments.
logger.trace(methodName, null, descr, "----------------------- Pass", (pass++), "------------------------------");
logger.trace(methodName, null, descr, "vshares", fmtArray(vshares));
logger.trace(methodName, null, descr, "nshares", fmtArray(nshares));
shares_given = false;
HashMap<IEntity, Integer> given_per_round = new HashMap<IEntity, Integer>(); // qshares
int allweights = 0;
for ( IEntity e : working ) {
allweights += e.getShareWeight();
given_per_round.put(e, 0);
}
//
// work out deserved for everybody based on what's still around.
//
int all_qshares = nshares[1];
for ( IEntity e : working ) {
int base_fs = (int) Math.floor(nshares[1] * ( (double) e.getShareWeight() / allweights ));
double d_base_fs = nshares[1] * ( (double) e.getShareWeight() / allweights );
deserved.put(e, base_fs);
all_qshares -= base_fs;
logger.trace(methodName, null, descr, e.getName(), "Wanted :", fmtArray(e.getWantedByOrder()));
logger.trace(methodName, null, descr, e.getName(), "deserved:", base_fs, d_base_fs);
}
logger.trace(methodName, null, descr, "Leftover after giving deserved:" + all_qshares);
if ( all_qshares > 0 ) {
for ( IEntity e: working ) {
deserved.put(e, deserved.get(e) + 1);
all_qshares--;
if ( all_qshares == 0 ) break;
}
}
for ( IEntity e : working ) {
logger.trace(methodName, null, descr, String.format("Final deserved by %15s: int[%3d] (after bonus)", e.getName(), deserved.get(e)));
}
for ( int o = maxorder; o > 0; o--) {
int total_taken = 0; // nshares
if ( nshares[o] == 0 ) {
logger.trace(methodName, null, descr, "O " + o + " no shares to give, moving on.");
continue;
}
for ( IEntity e : working ) {
int[] wbo = e.getWantedByOrder(); // processes - NShares
int[] gbo = given_by_order.get(e); // NShares
if ( wbo[o] == 0 ) {
logger.trace(methodName, null, descr, "O", o, "Entity", e.getName(), "nothing wanted at this order, moving on.");
continue;
}
double dgiven = nshares[o] * ((double) e.getShareWeight() / allweights) * o; // QShares for base calcs
int des = deserved.get(e); // total deserved this round QShares
int gpr = given_per_round.get(e); // total given this round
int mpr = Math.max(0, des-gpr); // max this round, deserved less what I aleady was given
//int tgiven = Math.min(mpr, (int) Math.floor(dgiven)); // what is calculated, capped by what I alreay have
// UIMA-4275, floor to ciel. Below with tgiven and rgiven we deal with ther emainder also. The floor plus the residual below
// seemed really agressive and some small allocation were working out to 0 when they shouldn't
int tgiven = Math.min(mpr, (int) Math.ceil(dgiven)); // what is calculated, capped by what I alreay have
int cap = e.calculateCap(); // get caps, if any, in qshares (simplified in UIMA-4275)
logger.trace(methodName, null, descr, "O", o, ":", e.getName(), "Before caps, given", tgiven, "cap", cap);
if ( gbo[0] >= cap ) { // UIMA-4275
logger.trace(methodName, null, descr, "O", o, "Entity", e.getName(), "cap prevents further allocation.");
continue;
}
int given = tgiven / o; // tentatively given, back to NShares
int rgiven = tgiven % o; // residual - remainder
//int twanted = wbo[0] + gbo[0]; // actual wanted: still wanted plus alredy given
// if ( twanted <= fragmentationThreshold ) { // if under the defrag limit, round up
if ( (rgiven > 0) && ( given == 0) ) {
given = Math.min( ++given, nshares[o] ); // UIMA-3664
}
// } // if not under the defrag limit, round down
if ( given + gbo[0] > cap ) { // adjust for caps
given = Math.max(0, cap - gbo[0]);
}
int taken = Math.min(given, wbo[o]); // NShares
taken = Math.min(taken, nshares[o] - total_taken); // cappend on physical (in case rounding overcommitted)
logger.trace(methodName, null, descr,
"O", o, ":", e.getName(), "After caps,",
" dgiven Q[", dgiven,
"] given N[", given ,
"] taken N[", taken ,
"]");
gbo[o] += taken;
gbo[0] += taken;
wbo[o] -= taken;
wbo[0] -= taken;
total_taken += taken;
given_per_round.put(e, given_per_round.get(e) + (taken*o));
}
if ( total_taken > 0 ) shares_given = true;
removeSharesByOrder(vshares, nshares, total_taken, o);
// If you were given all you deserve this round, then pull the weight so you don't
// dilute the giving for shares you aren't owed.
Iterator<IEntity> iter = working.iterator();
while ( iter.hasNext() ) {
IEntity e = iter.next();
int des = deserved.get(e);
int gpr = given_per_round.get(e);
int got_all = Math.max(0, des - gpr);
if ( got_all == 0 ) {
allweights -= e.getShareWeight();
}
}
if ( allweights <=0 ) break; // JRC JRC
}
// Remove entities that have everything they want or could otherwise get
Iterator<IEntity> iter = working.iterator();
while ( iter.hasNext() ) {
IEntity e = iter.next();
if ( (e.getWantedByOrder()[0] == 0) || (e.getGivenByOrder()[0] >= e.calculateCap()) ) { // UIMA-4275, checking fair-share cap
// logger.info(methodName, null, descr, e.getName(), "reaped, nothing more wanted:", fmtArray(e.getWantedByOrder()));
iter.remove();
}
}
// Remove entities that can't get anythng more. This is important to get better convergence - otherwise
// the "spectator" jobs will pollute the fair-share and convergence will be much harder to achieve.
iter = working.iterator();
while ( iter.hasNext() ) {
IEntity e = iter.next();
int[] wbo = e.getWantedByOrder();
boolean purge = true;
for ( int o = maxorder; o > 0; o-- ) {
if ( (wbo[o] > 0) && (nshares[o] > 0) ) { // if wants something, and resources still exist for it ...
purge = false; // then no purge
break;
}
}
if ( purge ) {
//logger.info(methodName, null, descr, e.getName(), "reaped, nothing more usablee:", fmtArray(e.getWantedByOrder()), "usable:",
// fmtArray(nshares));
iter.remove();
}
}
if ( logger.isTrace() ) {
logger.trace(methodName, null, descr, "Survivors at end of pass:");
for ( IEntity e : working ) {
logger.trace(methodName, null, descr, e.toString());
}
}
} while ( shares_given );
if ( logger.isTrace() ) {
logger.info(methodName, null, descr, "Final before bonus:");
for ( IEntity e : entities ) {
int[] gbo = e.getGivenByOrder();
logger.info(methodName, null, descr, String.format("%12s %s", e.getName(), fmtArray(gbo)));
}
}
//
// A final pass, in case something was left behind due to rounding.
// These are all bonus shares. We'll give preference to the "oldest"
// entities. But only one extra per pass, in order to evenly distribute.
//
// Technically, we might want to distribute the according to the
// entity weights, but we're not doing that (assuming all weights are 1).
//
boolean given = true;
//int bonus = 0;
while ( (nshares[1] > 0) && (given)) {
given = false;
for ( IEntity e : entities ) {
//int[] wbo = e.getWantedByOrder(); // nshares
int[] gbo = e.getGivenByOrder(); // nshares
for ( int o = maxorder; o > 0; o-- ) {
// the entity access its wbo, gbo, and entity-specific knowledge to decide whether
// the bonus is usable. if so, we give out exactly one in an attempt to spread the wealth.
//
// An example of where you can't use, is a class over a nodepool whose resources
// are exhausted, in which case we'd loop and see if anybody else was game.
// UIMA-4065
while ( (e.canUseBonus(o) ) && (vshares[o] > 0) ) {
gbo[o]++;
gbo[0]++;
removeSharesByOrder(vshares, nshares, 1, o);
given = true;
break;
}
}
// UIMA-4605 - old 'bonus' code -- keep for a while as quick reference
// for ( int o = maxorder; o > 0; o-- ) {
// int canuse = wbo[o] - gbo[o];
// while ( (canuse > 0 ) && (vshares[o] > 0) ) {
// gbo[o]++;
// //bonus++;
// canuse = wbo[o] - gbo[o];
// removeSharesByOrder(vshares, nshares, 1, o);
// given = true;
// break;
// }
// }
}
}
logger.debug(methodName, null, descr, "Final apportionment:");
for ( IEntity e : entities ) {
int[] gbo = e.getGivenByOrder(); // nshares
logger.debug(methodName, null, descr, String.format("%12s gbo %s", e.getName(), fmtArray(gbo)));
}
logger.debug(methodName, null, descr, "vshares", fmtArray(vshares));
logger.debug(methodName, null, descr, "nshares", fmtArray(nshares));
// if ( bonus > 0 ) {
// logger.debug(methodName, null, descr, "Final after bonus:");
// for ( IEntity e : entities ) {
// int[] gbo = e.getGivenByOrder(); // nshares
// logger.debug(methodName, null, descr, String.format("%12s %s", e.getName(), fmtArray(gbo)));
// }
// logger.debug(methodName, null, descr, "vshares", fmtArray(vshares));
// logger.debug(methodName, null, descr, "nshares", fmtArray(nshares));
// } else {
// logger.debug(methodName, null, descr, "No bonus to give.");
// }
}