in uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java [185:324]
public void engage(Object objectData) {
String location = "engage";
logger.trace(location, ILogger.null_id, "enter");
IActionData actionData = (IActionData) objectData;
try {
if(actionData != null) {
IRemoteWorkerThread rwt = actionData.getRemoteWorkerThread();
WiTracker tracker = WiTracker.getInstance();
IWorkItem wi = tracker.find(rwt);
IFsm fsm = wi.getFsm();
IMetaTaskTransaction trans = actionData.getMetaCasTransaction();
IRemoteWorkerProcess rwp = new RemoteWorkerProcess(trans);
//
JobDriver jd = JobDriver.getInstance();
JobDriverHelper jdh = JobDriverHelper.getInstance();
jd.advanceJdState(JdState.Active);
IMetaTask metaCas = null;
JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
IMetaMetaCas mmc = getMetaMetaCas(actionData);
if(mmc.isExhausted()) {
Long time = warnedExhausted.putIfAbsent(rwp, new Long(System.currentTimeMillis()));
if(time == null) {
MessageBuffer mbx = LoggerHelper.getMessageBuffer(actionData);
mbx.append(Standardize.Label.node.get()+rwp.getNodeName());
mbx.append(Standardize.Label.pid.get()+rwp.getPid());
mbx.append(Standardize.Label.text.get()+allCasesProcessed);
logger.debug(location, ILogger.null_id, mbx.toString());
}
TransactionHelper.addResponseHint(trans, Hint.Exhausted);
}
if(mmc.isPremature()) {
Long time = warnedPremature.putIfAbsent(rwp, new Long(System.currentTimeMillis()));
if(time == null) {
String text = fewerWorkItemsAvailableThanExpected;
jd.killJob(CompletionType.Exception, text);
MessageBuffer mbx = LoggerHelper.getMessageBuffer(actionData);
mbx.append(Standardize.Label.node.get()+rwp.getNodeName());
mbx.append(Standardize.Label.pid.get()+rwp.getPid());
mbx.append(Standardize.Label.text.get()+text);
logger.debug(location, ILogger.null_id, mbx.toString());
}
TransactionHelper.addResponseHint(trans, Hint.Premature);
}
else if(mmc.isKillJob()) {
Long time = warnedJobDiscontinued.putIfAbsent(rwp, new Long(System.currentTimeMillis()));
if(time == null) {
MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
mb.append(Standardize.Label.node.get()+rwp.getNodeName());
mb.append(Standardize.Label.pid.get()+rwp.getPid());
mb.append(Standardize.Label.text.get()+"job discontinued");
logger.warn(location, ILogger.null_id, mb.toString());
}
TransactionHelper.addResponseHint(trans, Hint.Killed);
ungetMetaMetaCas(actionData,rwp,mmc,RetryReason.ProcessVolunteered);
}
else if(jobProcessBlacklist.includes(rwp)) {
Long time = warnedProcessDiscontinued.put(rwp, new Long(System.currentTimeMillis()));
if(time == null) {
MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
mb.append(Standardize.Label.node.get()+rwp.getNodeName());
mb.append(Standardize.Label.pid.get()+rwp.getPid());
mb.append(Standardize.Label.text.get()+"process discontinued");
logger.warn(location, ILogger.null_id, mb.toString());
}
TransactionHelper.addResponseHint(trans, Hint.Blacklisted);
ungetMetaMetaCas(actionData,rwp,mmc,RetryReason.ProcessDown);
}
else {
metaCas = mmc.getMetaCas();
}
wi.setMetaCas(metaCas);
trans.setMetaTask(metaCas);
IWorkItemStateKeeper wisk = jd.getWorkItemStateKeeper();
MetaCasHelper metaCasHelper = new MetaCasHelper(metaCas);
IProcessStatistics pStats = jdh.getProcessStatistics(rwp);
//
IEvent event = null;
//
if(metaCas != null) {
int seqNo = metaCasHelper.getSystemKey();
String wiId = metaCas.getUserKey();
String node = rwt.getNodeAddress();
String pid = ""+rwt.getPid();
String tid = ""+rwt.getTid();
wisk.start(seqNo, wiId, node, pid, tid);
wisk.queued(seqNo);
pStats.dispatch(wi);
//
wi.setTodGet();
event = WiFsm.CAS_Available;
MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
JobDriver.getInstance().getMessageHandler().incGets();
logger.info(location, ILogger.null_id, mb.toString());
}
else {
event = WiFsm.CAS_Unavailable;
MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
mb.append("No CAS found for processing");
logger.debug(location, ILogger.null_id, mb.toString());
if(mmc.isExhausted()) {
Long time = warnedExhausted.put(rwp, new Long(System.currentTimeMillis()));
if(time == null) {
MessageBuffer mbx = LoggerHelper.getMessageBuffer(actionData);
mbx.append(Standardize.Label.node.get()+rwp.getNodeName());
mbx.append(Standardize.Label.pid.get()+rwp.getPid());
mbx.append(Standardize.Label.text.get()+allCasesProcessed);
logger.warn(location, ILogger.null_id, mbx.toString());
}
TransactionHelper.addResponseHint(trans, Hint.Exhausted);
}
if(mmc.isPremature()) {
Long time = warnedPremature.put(rwp, new Long(System.currentTimeMillis()));
if(time == null) {
String text = fewerWorkItemsAvailableThanExpected;
jd.killJob(CompletionType.Exception, text);
MessageBuffer mbx = LoggerHelper.getMessageBuffer(actionData);
mbx.append(Standardize.Label.node.get()+rwp.getNodeName());
mbx.append(Standardize.Label.pid.get()+rwp.getPid());
mbx.append(Standardize.Label.text.get()+text);
logger.debug(location, ILogger.null_id, mbx.toString());
}
TransactionHelper.addResponseHint(trans, Hint.Premature);
}
}
//
fsm.transition(event, actionData);
}
else {
MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
mb.append("No action data found for processing");
logger.warn(location, ILogger.null_id, mb.toString());
}
}
catch(Exception e) {
logger.error(location, ILogger.null_id, e);
JobDriver.getInstance().killJob(CompletionType.Exception);
}
logger.trace(location, ILogger.null_id, "exit");
}