protected void apportion_qshares()

in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java [387:676]


    protected void apportion_qshares(List<IEntity> entities, int[] vshares, String descr)
    {
        String methodName = "apportion_qshares";
        boolean shares_given = false;
        int maxorder = globalNodepool.getMaxOrder();
        int[] nshares = globalNodepool.makeArray();           // nshares


        if ( entities.size() == 0 ) return;
        Collections.sort(entities, entities.get(0).getApportionmentSorter());

        reworknShares(vshares, nshares);

        ArrayList<IEntity> working = new ArrayList<IEntity>();
        working.addAll(entities);

        HashMap<IEntity, int[]> given_by_order = new HashMap<IEntity, int[]>();
        HashMap<IEntity, Integer>   deserved   = new HashMap<IEntity, Integer>();      // qshares

        
        // This section dealing with RmCounter writes the counting parameters to the log in a form
        // that can be cut/pasted into a java properties file.  This file can then be used in the
        // RmCounter test/deveopment application.  It also turns out to be very useful in the log
        // in general so it is promoted to 'info' level.
        StringBuffer   enames = null;
        StringBuffer eweights = null;
        logger.info(methodName, null, descr, "RmCounter Start");
        logger.info(methodName, null, descr, "maxorder = ", globalNodepool.getMaxOrder());
        enames = new StringBuffer();            
        eweights = new StringBuffer();  

        for ( IEntity e : working ) {              
            int[] gbo = globalNodepool.makeArray();
            e.setGivenByOrder(gbo);

            given_by_order.put(e, gbo);
            deserved.put(e, 0);

            // jrc e.initWantedByOrder();

            enames.append(e.getName());
            enames.append(" ");
            eweights.append(Integer.toString(e.getShareWeight()));
            eweights.append(" ");      
        }

        logger.info(methodName, null, descr, "entity_names = ", enames.toString());
        logger.info(methodName, null, descr, "weights      = ", eweights.toString());
        for ( IEntity e : working ) {
            logger.info(methodName, null, descr, "wantedby." + e.getName() + " = ", fmtArray(e.getWantedByOrder()));
        }
        logger.info(methodName, null, descr, "vmachines =", fmtArray(vshares));
        logger.info(methodName, null, descr, "RmCounter End");


        int pass = 0;
        do {
            // Starting at highest order, give full fair share to any entity that wants it, minus the
            // shares already given.  Remove the newly given shares and trickle down the fragments.

            logger.trace(methodName, null, descr, "----------------------- Pass", (pass++), "------------------------------");
            logger.trace(methodName, null, descr, "vshares", fmtArray(vshares));
            logger.trace(methodName, null, descr, "nshares", fmtArray(nshares));
            shares_given = false;
            HashMap<IEntity, Integer> given_per_round = new HashMap<IEntity, Integer>();        // qshares
            int allweights = 0;
            for ( IEntity e : working ) {
                allweights += e.getShareWeight();
                given_per_round.put(e, 0);
            }

            //
            // work out deserved for everybody based on what's still around.
            //
            int all_qshares = nshares[1];
            for ( IEntity e : working ) {
                int base_fs = (int) Math.floor(nshares[1] * ( (double) e.getShareWeight() / allweights ));
                double d_base_fs = nshares[1] * ( (double) e.getShareWeight() / allweights );
                deserved.put(e, base_fs);
                all_qshares -= base_fs;

                logger.trace(methodName, null, descr, e.getName(), "Wanted  :", fmtArray(e.getWantedByOrder()));
                logger.trace(methodName, null, descr, e.getName(), "deserved:", base_fs, d_base_fs);
            }

            logger.trace(methodName, null, descr,  "Leftover after giving deserved:" + all_qshares);
            if ( all_qshares > 0 ) {
                for ( IEntity e: working ) {
                    deserved.put(e, deserved.get(e) + 1);
                    all_qshares--;
                    if ( all_qshares == 0 ) break;
                }
            }
            for ( IEntity e : working ) {
                logger.trace(methodName, null, descr, String.format("Final deserved by %15s: int[%3d] (after bonus)", e.getName(), deserved.get(e)));
            }

            for ( int o = maxorder; o > 0; o--) {  
                int total_taken = 0;                                 // nshares
                if ( nshares[o] == 0 ) {
                    logger.trace(methodName, null, descr, "O " + o + " no shares to give, moving on.");
                    continue;
                }
                for ( IEntity e : working ) {
                    int[] wbo = e.getWantedByOrder();                 // processes - NShares
                    int[] gbo = given_by_order.get(e);                //             NShares

                    if ( wbo[o] == 0 ) {
                        logger.trace(methodName, null, descr, "O", o, "Entity", e.getName(), "nothing wanted at this order, moving on.");
                        continue;
                    }

                    double dgiven = nshares[o] * ((double) e.getShareWeight() / allweights) * o;     // QShares for base calcs
                    int    des = deserved.get(e);                                                    // total deserved this round QShares
                    int    gpr = given_per_round.get(e);                                             // total given this round
                    int    mpr = Math.max(0, des-gpr);                                               // max this round, deserved less what I aleady was given
                    //int    tgiven = Math.min(mpr, (int) Math.floor(dgiven));                         // what is calculated, capped by what I alreay have
                    // UIMA-4275, floor to ciel.  Below with tgiven and rgiven we deal with ther emainder also.  The floor plus the residual below
                    //            seemed really agressive and some small allocation were working out to 0 when they shouldn't
                    int    tgiven = Math.min(mpr, (int) Math.ceil(dgiven));                          // what is calculated, capped by what I alreay have
                    int    cap = e.calculateCap();                                                   // get caps, if any, in qshares (simplified in UIMA-4275)
                    logger.trace(methodName, null, descr, "O", o, ":", e.getName(), "Before caps, given", tgiven, "cap", cap);

                    if ( gbo[0] >= cap ) {           // UIMA-4275
                        logger.trace(methodName, null, descr, "O", o, "Entity", e.getName(), "cap prevents further allocation.");
                        continue;
                    }

                    int    given = tgiven / o;                                                       // tentatively given, back to NShares
                    int    rgiven = tgiven % o;                                                      // residual - remainder
                    //int    twanted = wbo[0] + gbo[0];                                              // actual wanted: still wanted plus alredy given
                    // if ( twanted <= fragmentationThreshold ) {                                    // if under the defrag limit, round up
                    if ( (rgiven > 0) && ( given == 0) ) {
                        given = Math.min( ++given, nshares[o] );                                     // UIMA-3664
                    }
                    // }                                                                                // if not under the defrag limit, round down

                    if ( given + gbo[0] > cap ) {                                                    // adjust for caps
                        given = Math.max(0, cap - gbo[0]);
                    }

                    int    taken = Math.min(given, wbo[o]);                                          // NShares
                    taken = Math.min(taken, nshares[o] - total_taken);                               // cappend on physical (in case rounding overcommitted)

                    logger.trace(methodName, null, descr,
                                 "O", o, ":", e.getName(), "After  caps,",
                                 " dgiven Q[", dgiven,
                                 "] given N[", given ,
                                 "] taken N[", taken ,
                                 "]");

                    gbo[o] += taken;
                    gbo[0] += taken;
                    wbo[o] -= taken;
                    wbo[0] -= taken;
                    total_taken += taken;
                    given_per_round.put(e, given_per_round.get(e) + (taken*o));
                }
                if ( total_taken > 0 ) shares_given = true;
                removeSharesByOrder(vshares, nshares, total_taken, o);

                // If you were given all you deserve this round, then pull the weight so you don't
                // dilute the giving for shares you aren't owed.
                Iterator<IEntity> iter = working.iterator();
                while ( iter.hasNext() ) {
                    IEntity e = iter.next();
                    int des = deserved.get(e);
                    int gpr = given_per_round.get(e);
                    int got_all = Math.max(0, des - gpr);
                    if ( got_all == 0 ) {
                        allweights -= e.getShareWeight();
                    }
                }                
                if ( allweights <=0 ) break;   // JRC JRC
            }

            // Remove entities that have everything they want or could otherwise get
            Iterator<IEntity> iter = working.iterator();
            while ( iter.hasNext() ) {
                IEntity e = iter.next();
                if ( (e.getWantedByOrder()[0] == 0) || (e.getGivenByOrder()[0] >= e.calculateCap()) ) {      // UIMA-4275, checking fair-share cap
                    // logger.info(methodName, null, descr, e.getName(), "reaped, nothing more wanted:", fmtArray(e.getWantedByOrder()));
                    iter.remove();
                }
            }

            // Remove entities that can't get anythng more.  This is important to get better convergence - otherwise
            // the "spectator" jobs will pollute the fair-share and convergence will be much harder to achieve.
            
            iter = working.iterator();
            while ( iter.hasNext() ) {
                IEntity e = iter.next();
                int[] wbo = e.getWantedByOrder();
                boolean purge = true;
                for ( int o = maxorder; o > 0; o-- ) {
                    if ( (wbo[o] > 0) && (nshares[o] > 0) ) {   // if wants something, and resources still exist for it ...
                        purge = false;                          // then no purge
                        break;
                    }
                }
                if ( purge ) {
                    //logger.info(methodName, null, descr, e.getName(), "reaped, nothing more usablee:", fmtArray(e.getWantedByOrder()), "usable:",
                    //            fmtArray(nshares));
                    iter.remove();
                }
            }

            if ( logger.isTrace() ) {
                logger.trace(methodName, null, descr, "Survivors at end of pass:");
                for ( IEntity e : working ) {
                    logger.trace(methodName, null, descr, e.toString());
                }
            }
        } while ( shares_given );

        if ( logger.isTrace() ) {
            logger.info(methodName, null, descr, "Final before bonus:");
            for ( IEntity e : entities ) {
                int[] gbo = e.getGivenByOrder();
                logger.info(methodName, null, descr, String.format("%12s %s", e.getName(), fmtArray(gbo)));
            }
        }

        //
        // A final pass, in case something was left behind due to rounding.
        // These are all bonus shares.  We'll give preference to the "oldest" 
        // entities.  But only one extra per pass, in order to evenly distribute.
        //
        // Technically, we might want to distribute the according to the
        // entity weights, but we're not doing that (assuming all weights are 1).
        //
        boolean given = true;
        //int     bonus = 0;
        while ( (nshares[1] > 0) && (given)) {
            given = false;
            for ( IEntity e : entities ) {
                //int[] wbo = e.getWantedByOrder();         // nshares
                int[] gbo = e.getGivenByOrder();          // nshares

                for ( int o = maxorder; o > 0; o-- ) {                
                    // the entity access its wbo, gbo, and entity-specific knowledge to decide whether
                    // the bonus is usable.  if so, we give out exactly one in an attempt to spread the wealth.
                    //
                    // An example of where you can't use, is a class over a nodepool whose resources
                    // are exhausted, in which case we'd loop and see if anybody else was game.
                    // UIMA-4065
                    while ( (e.canUseBonus(o) ) && (vshares[o] > 0) ) {
                        gbo[o]++;
                        gbo[0]++;
                        removeSharesByOrder(vshares, nshares, 1, o);
                        given = true;
                        break;
                    }
                }

                // UIMA-4605 - old 'bonus' code -- keep for a while as quick reference
                // for ( int o = maxorder; o > 0; o-- ) {                
                //     int canuse = wbo[o] - gbo[o];
                //     while ( (canuse > 0 ) && (vshares[o] > 0) ) {
                //         gbo[o]++;
                //         //bonus++;
                //         canuse = wbo[o] - gbo[o];
                //         removeSharesByOrder(vshares, nshares, 1, o);
                //         given = true;
                //         break;
                //     }
                // }
            }
        } 

        logger.debug(methodName, null, descr, "Final apportionment:");
        for ( IEntity e : entities ) {
            int[] gbo = e.getGivenByOrder();          // nshares
            logger.debug(methodName, null, descr, String.format("%12s gbo %s", e.getName(), fmtArray(gbo)));                
        }
        logger.debug(methodName, null, descr, "vshares", fmtArray(vshares));
        logger.debug(methodName, null, descr, "nshares", fmtArray(nshares));
          
        // if ( bonus > 0 ) {
        //     logger.debug(methodName, null, descr, "Final after bonus:");
        //     for ( IEntity e : entities ) {
        //         int[] gbo = e.getGivenByOrder();          // nshares
        //         logger.debug(methodName, null, descr, String.format("%12s %s", e.getName(), fmtArray(gbo)));                
        //     }
        //     logger.debug(methodName, null, descr, "vshares", fmtArray(vshares));
        //     logger.debug(methodName, null, descr, "nshares", fmtArray(nshares));
        // } else {
        //     logger.debug(methodName, null, descr, "No bonus to give.");
        // }
    }