void detectFragmentation()

in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java [2296:2512]


    void detectFragmentation(HashMap<IRmJob, Integer> needed_by_job)
    {
    	String methodName = "detectFragmentation";

        if ( logger.isDebug() ) {
            logger.debug(methodName, null, "vMachines:", fmtArray(globalNodepool.cloneVMachinesByOrder()));
        }

        List<NodePool> poollist = new ArrayList<NodePool>();
        getNodepools(globalNodepool, poollist);
        NodePool[] allPools = poollist.toArray(new NodePool[poollist.size()]);

        if ( logger.isDebug() ) {
            // make sure the node pool list is built correctly
            StringBuffer sb = new StringBuffer("Nodepools:");
            for ( NodePool np : allPools ) {
                sb.append(np.getId());
                sb.append(" ");
            }
            logger.debug(methodName, null, sb.toString());
        }

        // These next two maps are built lazily, once per call to this routine
        Map<String, int[]> vshares = new HashMap<String, int[]>();     // Virtual shares of each order in each nodepool.
                                               // This gets enhanced with pending evictions so we
                                               // can tell whether enough space is accounted for
                                               // for each job.
        Map<String, int[]> nshares = new HashMap<String, int[]>();     // For each order, the number of shares available,
                                               // either directly, or through splits from higher-order
                                               // space, enhanced with pending evictions and purges.
        Map<String, Map<IRmJob, Integer>> jobs = new HashMap<String, Map<IRmJob, Integer>>();

        for ( int npi = 0; npi < allPools.length; npi++ ) {                       // Turns out to be depth-first traversal !
            // First pass, init the structures, including any free space that may have been unusable
            NodePool np = allPools[npi];
            String id = np.getId();

            int[] vmach =globalNodepool.makeArray();
            int[] nmach =globalNodepool.makeArray();
            Map<IRmJob, Integer> jobmap = new HashMap<IRmJob, Integer>();
            vshares.put(id, vmach);
            nshares.put(id, nmach);
            jobs.put(id, jobmap);
        }

        boolean must_defrag = false;
        String headerfmt = "%14s %20s %6s %4s %7s %6s %2s";
        String datafmt   = "%14s %20s %6d %4d %7d %6d %2d";

        for ( ResourceClass rc : resourceClasses.values() ) {
            // Next: Look at every job and work out its "need".  Collect jobs by nodepool into the jobmaps.
            Map<IRmJob, IRmJob> allJobs = rc.getAllJobs();
            String npn = rc.getNodepoolName();
            Map<IRmJob, Integer> jobmap = jobs.get(npn);

            if ( allJobs.size() == 0 ) continue;

            logger.info(methodName, null, String.format(headerfmt, "Nodepool", "User", "PureFS", "NSh", "Counted", "Needed", "O"), "Class:", rc.getName());
            for ( IRmJob j : allJobs.values() ) {

                if ( j.isRefused() ) {
                    continue;
                }

                if ( j.isDeferred() ) {
                    continue;
                }

                int counted = j.countNSharesGiven();          // accounting for ramp-up, various caps, etc. 

                int current = j.countNShares();                // currently allocated, plus pending, less those removed by earlier preemption
                int needed = counted - current;                // could go negative if its evicting
                int order = j.getShareOrder();
                
                if ( j.getSchedulingPolicy() == Policy.FAIR_SHARE ) {   // cap on frag threshold
                    if ( current >= fragmentationThreshold ) { 
                        needed = 0;
                    } else if ( current >= j.getPureFairShare() ) {     // more than our pure share, we're not needy
                        needed = 0;
                    } else if ( needed < 0 ) {                          // more than out count, likely are evicting
                        needed = 0;
                    } else if ( needed > 0) {
                        needed = Math.min(needed, fragmentationThreshold);
                        jobmap.put(j, needed);                 // we'll log this in a minute, not here
                        must_defrag = true;
                    }                    
                } else {                                       // if not fair-share, must always try to defrag if needed
                                                               // its full allocation, and therefore cannot be needy
                    // UIMA-4275 We rely on quotas to keep 'needed' under control
                    if ( needed > 0 ) {
                        jobmap.put(j, needed);   
                        must_defrag = true;
                    }
                }


                logger.info(methodName, j.getId(), 
                            String.format(datafmt, 
                                          npn,
                                          j.getUser().getName(),
                                          j.getPureFairShare(),
                                          current,
                                          counted,
                                          needed,
                                          order),
                            (needed > 0) ? "POTENTIALLY NEEDY" : ""
                            );

//                             String.format("NP: %10s User %8s Pure fs[%3d] Nshares[%3d] AsgnNshares[%3d] needed[%3d] O[%d] %s",
//                                           npn,
//                                           j.getUser().getName(),
//                                           j.getPureFairShare(),
//                                           current,
//                                           counted,
//                                           needed,
//                                           order,
//                                           (needed > 0) ? "POTENTIALLY NEEDY" : ""
//                                           ));

                
            }
        }
        if ( ! must_defrag ) return;                          // the rest of this is expensive, let's bypass if we can

        for ( int npi = 0; npi < allPools.length; npi++ ) {                       // Turns out to be depth-first traversal !
            // Next pass, find all the open and potentially open spots in each nodepool.  We need to coalesce
            // evicted shares with each other and with open space on each machine in order to know whether the
            // space is potentially usable by jobs.
            NodePool np = allPools[npi];

            Map<Node, Machine> machs = np.getAllMachinesForPool();
            for ( Machine m : machs.values() ) {
                int free = m.countFreedUpShares();                           // free space plus evicted shares - eventual space
                if ( free != 0 ) {
                    logger.trace(methodName, null, "Freed shares", free, "on machine", m.getId(), "in nodepool", np.getId());
                    for ( NodePool npj = np; npj != null; npj = npj.getParent() ) {        // must propogate up because of how these tables work
                        String id_j = npj.getId();
                        int[] vmach_j = vshares.get(id_j);
                        if (free >= vmach_j.length) {               // UIMA-5605 Avoid out-of-bounds exception
                          logger.warn(methodName, null, "Nodepool",id_j,"has a maximum size of", vmach_j.length-1, 
                                  "but the computed free space is", free);
                          free = vmach_j.length - 1;
                        }
                        logger.trace(methodName, null, "Update v before: NP[", id_j, "] v:", fmtArray(vmach_j));
                        vmach_j[free]++;                                         // This is the largest potential share that can be made on this machine,
                        // after evictions starting 'here' and propogating up
                        logger.trace(methodName, null, "Update v after : NP[", id_j, "] v:", fmtArray(vmach_j));
                    }
                }                
            }
        }
        
        for ( int npi = 0; npi < allPools.length; npi++ ) {
            // Next pass, create the cumulative "n" style table from the "v" table of holes
            String id = allPools[npi].getId();

            int[] vmach = vshares.get(id);
            int[] nmach = nshares.get(id);
            reworknShares(vmach, nmach);                                 // Populate nmach from vmach for this np, with free or potentially free shares

            if ( logger.isInfo() ) {
                logger.info(methodName, null, "NP", id, "After check: virtual    free Space", fmtArray(vmach));
                logger.info(methodName, null, "NP", id, "After check: cumulative free Space", fmtArray(nmach));
            }
        }

        for ( int npi = 0; npi < allPools.length; npi++ ) {
            // Last step, traverse jobs by nodepool and determine their need.
            String id = allPools[npi].getId();

            Map<IRmJob, Integer> jobmap = jobs.get(id);
            if ( jobmap.size() == 0 ) continue;                          // only looking at potentially needy jobs

            int[] nmach = nshares.get(id);

            for ( IRmJob j : jobmap.keySet() ) {

                int needed = jobmap.get(j);
                int order = j.getShareOrder();
                int available = nmach[order];
                int to_remove = 0;

                needyJobs.put(j, j);
                
                if ( available >= needed ) {
                    needed = 0;
                    to_remove = needed;
                } else {
                    to_remove = available;
                    needed -= to_remove;
                }
                
                if ( to_remove > 0 ) {
                    NodePool np = allPools[npi];
                    for ( NodePool npj = np; npj != null; npj = npj.getParent() ) {        // must propogate up because of how these tables work
                        String id_j = npj.getId();
                        int[] vmach_j = vshares.get(id_j);
                        int[] nmach_j = nshares.get(id_j);
                        removeSharesByOrder(vmach_j, nmach_j, to_remove, order);
                    }
                }
                
                if ( needed > 0 ) {
                    needed_by_job.put(j, needed);
                    logger.info(methodName, j.getId(), 
                                String.format("NP: %10s User: %10s Pure fs: %3d needed %3d O[%d] %s",
                                              id,
                                              j.getUser().getName(),
                                              j.getPureFairShare(),
                                              needed,
                                              order,
                                              "ACTUALLY NEEDY"
                                              ));
                }
            }
        }            
    }