public void reconcileProcesses()

in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java [733:894]


    public void reconcileProcesses(DuccId jobid, IDuccWork l, IDuccWork r)
    {
    	String methodName = "reconcileProcess";
        IDuccProcessMap lpm = ((IDuccWorkJob )l).getProcessMap();
        IDuccProcessMap rpm = ((IDuccWorkJob)r).getProcessMap();

        @SuppressWarnings("unchecked")
        DuccMapDifference<DuccId, IDuccProcess> diffmap = DuccCollectionUtils.difference(lpm, rpm);

        // new stuff in in the left side of the map
        Map<DuccId, IDuccProcess> lproc = diffmap.getLeft();
        
        for ( IDuccProcess p : lproc.values() ) {
            // look up share, update resident memory, process state, investment (eventually), maybe pid?
            // simply update the share with the information.  we pass in the jobid as a sanity check so
            // we can crash or at least complain loudly on mismatch.

            Share s = scheduler.getShare(p.getDuccId());

            if(s != null) {
            	long mem = p.getResidentMemory();
                long investment = p.getWiMillisInvestment();
                ProcessState state = p.getProcessState();
                String pid = p.getPID();

                logger.info(methodName, jobid, "New process ", s.toString(), mem, state, pid);
                if ( ! s.update(jobid, mem, investment, state, p.getTimeWindowInit(), pid) ) {
                    // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
                    throw new SchedulingException(jobid, "Process assignemnt arrives for share " + s.toString() +
                                                  " but jobid " + jobid + " does not match share " + s.getJob().getId());
                }
                //scheduler.signalGrowth(jobid, s);
                // sadly, the pid is almost always null here
                //logger.info(methodName, jobid, 
                //            "New process arrives for share", s.toString(), "PID", pid);
            }
            else {
            	logger.warn(methodName, jobid, p.getDuccId(), "share not found?");
            }
        }
            
        // gone stuff in in the right side of the map
        Map<DuccId, IDuccProcess> rproc = diffmap.getRight();
        for ( IDuccProcess p : rproc .values()) {
            // these processes are done.  look up the job and tell it process complete.
            Share s = scheduler.getShare(p.getDuccId());
            IRmJob j = scheduler.getJob(jobid);
            if ( j == null ) {
                throw new SchedulingException(jobid, "Process completion arrives for share " + s.toString() +
                                              " but job " + jobid + "cannot be found.");
            }

            switch ( l.getDuccType() ) {        // UIMA-4326, if not a jobjob, the job must not get reallocations
                case Job:
                    break;
                default:
                    j.markComplete();
            }

            scheduler.signalCompletion(j, s);
            logger.info(methodName, jobid, 
                         String.format("Process %5s", p.getPID()),
                         "Completion:", s.toString());
        }

        for( DuccMapValueDifference<IDuccProcess> pd: diffmap ) {
            IDuccProcess pl = pd.getLeft();
            IDuccProcess pr = pd.getRight();

            Share sl = scheduler.getShare(pl.getDuccId());
            Share sr = scheduler.getShare(pr.getDuccId());

            String shareL = ( sl == null ) ? "<none>" : sl.toString();
            String shareR = ( sr == null ) ? "<none>" : sr.toString();

            ITimeWindow initL = pl.getTimeWindowInit();
            ITimeWindow initR = pr.getTimeWindowInit();
            long init_timeL = (initL == null) ? 0 : initL.getElapsedMillis();
            long init_timeR = (initR == null) ? 0 : initR.getElapsedMillis();

            /** extreme debugging only*/
            if ( logger.isTrace() ) {
                logger.trace(methodName, jobid, 
                             "\n\tReconciling. incoming.(did, pid, mem, state, share, initTime, investment)", 
                             pl.getDuccId(),
                             pl.getPID(),
                             pl.getResidentMemory(),
                             pl.getProcessState(),
                             shareL,
                             init_timeL,
                             pl.getWiMillisInvestment(),
                             "\n\tReconciling. existing.(did, pid, mem, state, share, initTime, investment)", 
                             pr.getDuccId(),
                             pr.getPID(),
                             pr.getResidentMemory(),
                             pr.getProcessState(),
                             shareR,
                             init_timeR,
                             pr.getWiMillisInvestment()
                             );
            } else {
                if ( (pr.getPID() == null) && (pl.getPID() != null) ) {
                      logger.trace(methodName, jobid, 
                                String.format("Process %5s", pl.getPID()),
                                "PID assignement for share", shareL);
                }
                if ( pl.getProcessState() != pr.getProcessState() ) {
                    logger.info(methodName, jobid, 
                                String.format("Process %5s", pl.getPID()), shareL,
                                "State:", pr.getProcessState(), "->", pl.getProcessState(),
                                getElapsedTime(pr.getTimeWindowInit()), getElapsedTime(pr.getTimeWindowRun()));
                }
            }

            long mem = pl.getResidentMemory();
            long investment = pl.getWiMillisInvestment();
            ProcessState state = pl.getProcessState();
            String pid = pl.getPID();                        
            Share s = scheduler.getShare(pl.getDuccId());
            if ( pl.isActive() ) {
                
                if ( s == null ) {
                    // this can happen if a node dies and the share is purged so it's ok.
                    logger.warn(methodName, jobid, "Update for share from process", pl.getPID(), pl.getDuccId(), "but cannot find share.");
                    continue;
                }

                // UIMA-3856 Can't do anything or else OR bugs will lose the state :(
                // if ( s.isPurged() ) {
                //     IRmJob j = scheduler.getJob(jobid);
                //     scheduler.signalCompletion(j, s);
                //     logger.info(methodName, jobid, "Process", pl.getPID(), "marked complete because it is purged. State:", state);
                // }

                if ( ! s.update(jobid, mem, investment, state, pl.getTimeWindowInit(), pid) ) {
                    // TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
                    throw new SchedulingException(jobid, "Process update arrives for share " + s.toString() +
                                                  " but jobid " + jobid + " does not match job in share " + s.getJob().getId());
                }
                // logger.debug(methodName, jobid, "Process update to process ", pid, "mem", mem, "state", state, "is assigned for share", s.toString());

            } else if ( pl.isComplete() ) {
                IRmJob j = scheduler.getJob(jobid);
                if ( s != null ) {              // in some final states the share is already gone, not an error (e.g. Stopped)
                    scheduler.signalCompletion(j, s);          // signal the **process** (not job) is complete
                    logger.info(methodName, jobid, "Process", pl.getPID(), " completed due to state", state);
                }

                switch ( l.getDuccType() ) {        // UIMA-4326, if not a jobjob, the job must not get reallocations
                    case Job:
                        break;
                    default:
                        j.markComplete();
                }
                
            } else {
                logger.info(methodName, jobid, "Process", pl.getPID(), "ignoring update because of state", state);
            }
                    
        }            

    }