int takeFromTheRich()

in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java [1924:2101]


    int takeFromTheRich(IRmJob nj, 
                        int needed,
                        TreeMap<User, User> users_by_wealth,
                        HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user)
    {
    	String methodName = "takeFromTheRich";
        // 1. Collect all machines that have shares, which if evicted, would make enough space
        //    - in compatible NP
        //    - g + sum(shares belonging to rich users on the machine);
        // 2. Order the machiens by 
        //    a) richest user
        //    b) largest machine
        // 3. Pick next machine,
        //    - clear enough shares
        //    - remove machine from list
        //    - update wealth
        // 4. Repeat at 2 until
        //    a) have given what is needed
        //    b) nothing left to give

        Map<IRmJob,  IRmJob>  candidateJobs    = new HashMap<IRmJob,  IRmJob>();
        Map<Machine, Machine> eligibleMachines = new TreeMap<Machine, Machine>(new EligibleMachineSorter());

        for ( TreeMap<IRmJob, IRmJob> jobs : jobs_by_user.values() ) {
            candidateJobs.putAll(jobs);
        }

        int given = 0;
        int orderNeeded = nj.getShareOrder();
        
        ResourceClass cl     = nj.getResourceClass();               // needy job's resource class
        String        npname = cl.getNodepoolName();                // name of the class
        NodePool      np     = globalNodepool.getSubpool(npname);   // job's nodepool
        Map<Node, Machine> machines = np.getAllMachines();          // everything here is a candidate, nothing else is
                                                                    //   this is the machines in the pool, and all the
                                                                    //   subpools

        // Here we filter all the machines looking for machines that *might* be able to satisfy the defrag.  At the 
        // end this set of machines is eligbleMachines.
        machine_loop : 
            for ( Machine m : machines.values() ) {

                if ( m.getShareOrder() < orderNeeded ) {                // nope, too small
                    logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded); 
                    continue;
                }

                // if the job is a reservation the machine size has to matchm and machine must be clearable
                if ( nj.getSchedulingPolicy() == Policy.RESERVE ) {
                    if ( m.getShareOrder() != orderNeeded ) {
                        logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": RESERVE policy requires exact match for order", orderNeeded);
                        continue;
                    }
                    // machine must be clearable as well
                    Collection<Share> shares = m.getActiveShares().values();
                    for ( Share s : shares ) {
                        if ( ! candidateJobs.containsKey(s.getJob()) ) {
                            logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": for reservation, machine contains non-candidate job", s.getJob().getId());
                            continue machine_loop;
                        }
                    }                
                }

                Map<Share, Share> as = m.getActiveShares();            // everything alloacated here
                int g = m.getVirtualShareOrder();                      // g is space that we might be able to make after defrag:
                //    free space + freeable-from-candidates
                for ( Share s : as.values() ) {
                    IRmJob j = s.getJob();
                    if ( s.isForceable() && candidateJobs.containsKey(j) ) {  // evictable, and a candidate for reclamation by defrag
                        g += j.getShareOrder();
                    }
                }

                if ( g >= orderNeeded ) {                              // if it's usable by the job, it's a candidate
                    logger.info(methodName, nj.getId(), "Candidate machine:", m.getId());
                    eligibleMachines.put(m, m);
                } else {
                    logger.info(methodName, nj.getId(), "Not a candidate, insufficient free space + candidate shares:", m.getId());
                }
            }
        
        // Now eligibleMachines is the set of candidate machines for defrag

        logger.info(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:");
        StringBuffer buf = new StringBuffer();
        for ( Machine m : eligibleMachines.keySet() ) {
            buf.append(m.getId());
            buf.append(" ");
            }
        logger.info(methodName, nj.getId(), "Eligible machines:", buf.toString());

        // first part done, we know where to look.

        // Now just bop through the machines to see if we can get anything for this specific job (nj)
        int given_per_round = 0;
        do {
            int g = 0;
            given_per_round = 0;
            for ( Machine m : eligibleMachines.keySet() ) {

                //
                // How best to order candidate shares?  You can choose the "wealthiest" first, but if it's not a good
                // match by size, end up evicting too many shares which could include a not-so-wealthy share, or
                // increase frag by breaking it up and leaving a useless bit.
                //
                // So we're going to try ordering shares by "wealthiest", but then if we find an exact match by size,
                // order that to the front of the candidates.  We may not end up evicting the "wealthiest", but we
                // should end up evicting tne least disruptive share.
                //
                List<Share> sh = new ArrayList<Share>();
                sh.addAll(m.getActiveShares().values());
                Collections.sort(sh, new ShareByWealthSorter());

                g = m.getVirtualShareOrder();         // ( free space at this point )
                List<Share> potentialShares     = new ArrayList<Share>();
                for ( Share s : sh ) {
                    IRmJob j = s.getJob();
                    // User u = j.getUser();
                    
                    if ( s.isForceable() ) {
                        if ( candidateJobs.containsKey(j) ) {
                            g += s.getShareOrder();
                            if ( s.getShareOrder() == orderNeeded ) {
                                potentialShares.add(0, s);    // exact matches first
                            } else {
                                potentialShares.add(s);
                            }
                        }
                    }
                    if ( g >= orderNeeded ) break;
                }
                
                // potentialShares should be properly ordered as discussed above at this point
                if ( g >= orderNeeded ) {
                    // found enough on this machine for 1 share!
                    logger.debug(methodName, nj.getId(), "Clearing shares: g[", g, "], orderNeeded[", orderNeeded, "]");
                    g = m.getVirtualShareOrder();             // reset
                    for ( Share s : potentialShares ) {
                        IRmJob j = s.getJob();
                        User u = j.getUser();

                        g += s.getShareOrder();
                        given_per_round++;
                        clearShare(s, nj);
                        u.subtractWealth(s.getShareOrder());
                        logger.debug(methodName, nj.getId(), "Clearing share", s, "order[", s.getShareOrder(),
                                     "]: g[", g, "], orderNeeded[", orderNeeded, "]");
                        if ( g >= orderNeeded) break; // inner loop, could break on exact match without giving everything away
                    }
                    break;                            // outer loop, if anything was found
                }       
            }

            if ( given_per_round > 0 ) {
                // Must reorder the eligible list to get the "next" best candidate.  We could try to remove 
                // machines that were exhausted above ...
                Map<Machine, Machine> tmp = new HashMap<Machine, Machine>();
                tmp.putAll(eligibleMachines);
                eligibleMachines.clear();
                for ( Machine m : tmp.keySet() ) {
                    eligibleMachines.put(m, m);
                }

                // and also must track how many processes we made space for
                given = given + (g / orderNeeded);    // at least one,or else we have a bug 
                logger.debug(methodName, nj.getId(), "LOOPEND: given[", given, "] g[", g, "] orderNeeded[", orderNeeded, "]");
            }
            logger.debug(methodName, nj.getId(), "Given_per_round", given_per_round, "given", given, "needed", needed);
        } while ( (given_per_round > 0) && ( given < needed ));

        // Sometimes we can directly reassign a share, in which case the job isn't waiting any more.
        // We only care about setting a message if the poor thing is still totally starved of resources.
        if ( nj.countNShares() == 0 ) {
            nj.setReason("Waiting for defragmentation.");
        }

        return given;
    }