in uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java [733:894]
public void reconcileProcesses(DuccId jobid, IDuccWork l, IDuccWork r)
{
String methodName = "reconcileProcess";
IDuccProcessMap lpm = ((IDuccWorkJob )l).getProcessMap();
IDuccProcessMap rpm = ((IDuccWorkJob)r).getProcessMap();
@SuppressWarnings("unchecked")
DuccMapDifference<DuccId, IDuccProcess> diffmap = DuccCollectionUtils.difference(lpm, rpm);
// new stuff in in the left side of the map
Map<DuccId, IDuccProcess> lproc = diffmap.getLeft();
for ( IDuccProcess p : lproc.values() ) {
// look up share, update resident memory, process state, investment (eventually), maybe pid?
// simply update the share with the information. we pass in the jobid as a sanity check so
// we can crash or at least complain loudly on mismatch.
Share s = scheduler.getShare(p.getDuccId());
if(s != null) {
long mem = p.getResidentMemory();
long investment = p.getWiMillisInvestment();
ProcessState state = p.getProcessState();
String pid = p.getPID();
logger.info(methodName, jobid, "New process ", s.toString(), mem, state, pid);
if ( ! s.update(jobid, mem, investment, state, p.getTimeWindowInit(), pid) ) {
// TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
throw new SchedulingException(jobid, "Process assignemnt arrives for share " + s.toString() +
" but jobid " + jobid + " does not match share " + s.getJob().getId());
}
//scheduler.signalGrowth(jobid, s);
// sadly, the pid is almost always null here
//logger.info(methodName, jobid,
// "New process arrives for share", s.toString(), "PID", pid);
}
else {
logger.warn(methodName, jobid, p.getDuccId(), "share not found?");
}
}
// gone stuff in in the right side of the map
Map<DuccId, IDuccProcess> rproc = diffmap.getRight();
for ( IDuccProcess p : rproc .values()) {
// these processes are done. look up the job and tell it process complete.
Share s = scheduler.getShare(p.getDuccId());
IRmJob j = scheduler.getJob(jobid);
if ( j == null ) {
throw new SchedulingException(jobid, "Process completion arrives for share " + s.toString() +
" but job " + jobid + "cannot be found.");
}
switch ( l.getDuccType() ) { // UIMA-4326, if not a jobjob, the job must not get reallocations
case Job:
break;
default:
j.markComplete();
}
scheduler.signalCompletion(j, s);
logger.info(methodName, jobid,
String.format("Process %5s", p.getPID()),
"Completion:", s.toString());
}
for( DuccMapValueDifference<IDuccProcess> pd: diffmap ) {
IDuccProcess pl = pd.getLeft();
IDuccProcess pr = pd.getRight();
Share sl = scheduler.getShare(pl.getDuccId());
Share sr = scheduler.getShare(pr.getDuccId());
String shareL = ( sl == null ) ? "<none>" : sl.toString();
String shareR = ( sr == null ) ? "<none>" : sr.toString();
ITimeWindow initL = pl.getTimeWindowInit();
ITimeWindow initR = pr.getTimeWindowInit();
long init_timeL = (initL == null) ? 0 : initL.getElapsedMillis();
long init_timeR = (initR == null) ? 0 : initR.getElapsedMillis();
/** extreme debugging only*/
if ( logger.isTrace() ) {
logger.trace(methodName, jobid,
"\n\tReconciling. incoming.(did, pid, mem, state, share, initTime, investment)",
pl.getDuccId(),
pl.getPID(),
pl.getResidentMemory(),
pl.getProcessState(),
shareL,
init_timeL,
pl.getWiMillisInvestment(),
"\n\tReconciling. existing.(did, pid, mem, state, share, initTime, investment)",
pr.getDuccId(),
pr.getPID(),
pr.getResidentMemory(),
pr.getProcessState(),
shareR,
init_timeR,
pr.getWiMillisInvestment()
);
} else {
if ( (pr.getPID() == null) && (pl.getPID() != null) ) {
logger.trace(methodName, jobid,
String.format("Process %5s", pl.getPID()),
"PID assignement for share", shareL);
}
if ( pl.getProcessState() != pr.getProcessState() ) {
logger.info(methodName, jobid,
String.format("Process %5s", pl.getPID()), shareL,
"State:", pr.getProcessState(), "->", pl.getProcessState(),
getElapsedTime(pr.getTimeWindowInit()), getElapsedTime(pr.getTimeWindowRun()));
}
}
long mem = pl.getResidentMemory();
long investment = pl.getWiMillisInvestment();
ProcessState state = pl.getProcessState();
String pid = pl.getPID();
Share s = scheduler.getShare(pl.getDuccId());
if ( pl.isActive() ) {
if ( s == null ) {
// this can happen if a node dies and the share is purged so it's ok.
logger.warn(methodName, jobid, "Update for share from process", pl.getPID(), pl.getDuccId(), "but cannot find share.");
continue;
}
// UIMA-3856 Can't do anything or else OR bugs will lose the state :(
// if ( s.isPurged() ) {
// IRmJob j = scheduler.getJob(jobid);
// scheduler.signalCompletion(j, s);
// logger.info(methodName, jobid, "Process", pl.getPID(), "marked complete because it is purged. State:", state);
// }
if ( ! s.update(jobid, mem, investment, state, pl.getTimeWindowInit(), pid) ) {
// TODO: probably change to just a warning and cancel the job - for now I want an attention-getter
throw new SchedulingException(jobid, "Process update arrives for share " + s.toString() +
" but jobid " + jobid + " does not match job in share " + s.getJob().getId());
}
// logger.debug(methodName, jobid, "Process update to process ", pid, "mem", mem, "state", state, "is assigned for share", s.toString());
} else if ( pl.isComplete() ) {
IRmJob j = scheduler.getJob(jobid);
if ( s != null ) { // in some final states the share is already gone, not an error (e.g. Stopped)
scheduler.signalCompletion(j, s); // signal the **process** (not job) is complete
logger.info(methodName, jobid, "Process", pl.getPID(), " completed due to state", state);
}
switch ( l.getDuccType() ) { // UIMA-4326, if not a jobjob, the job must not get reallocations
case Job:
break;
default:
j.markComplete();
}
} else {
logger.info(methodName, jobid, "Process", pl.getPID(), "ignoring update because of state", state);
}
}
}