in uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java [275:402]
public String call() throws ServiceInitializationException, ServiceException {
// we may fail in initialize() in which case the ServiceInitializationException
// is thrown
initialize();
// now wait for application to call start
awaitStart();
// check ThreadLocal for a Map entry for this thread id. If not found, create
// dedicated XStream instance for this thread which will be useed to serialize/deserialize
// this thread's tasks
if (threadLocalXStream.get().get(Thread.currentThread().getId()) == null) {
threadLocalXStream.get().put(Thread.currentThread().getId(),
XStreamUtils.getXStreamInstance());
}
// all threads intialized, enter running state
IMetaTaskTransaction transaction = null;
if (logger.isLoggable(Level.INFO)) {
logger.log(Level.INFO,
".............. Thread " + Thread.currentThread().getId() + " ready to process");
}
while (running) {
try {
// send GET Request
transaction = callGet(new MetaTaskTransaction());
// the code may have blocked in callGet for awhile, so check
// if service is still running. If this service is in quiescing
// mode, finish processing current task. The while-loop will
// terminate when the task is finished.
if (!running && !quiescing) {
break;
}
// transaction may be null if retryUntilSuccessfull was interrupted
// due to stop
if (Objects.isNull(transaction) || (!running && !quiescing)) {
break;
}
logger.log(Level.FINE,
".............. Thread " + Thread.currentThread().getId() + " processing new task");
if (Objects.isNull(transaction.getMetaTask())) {
// this should only be the case when the service is stopping and transport is
// shutdown
if (running) {
logger.log(Level.INFO, ".............. Thread " + Thread.currentThread().getId()
+ " GET returned null MetaTask while service is in a running state - this is unexpected");
}
// if !running, the while loop above will terminate
continue;
}
Object task = transaction.getMetaTask().getUserSpaceTask();
// send ACK
transaction = callAck(transaction);
if (!running && !quiescing) {
break;
}
IProcessResult processResult = processor.process((String) task);
// assume success
Action action = Action.CONTINUE;
// check if process error occurred.
String errorAsString = processResult.getError();
if (processResult.terminateProcess()) {
action = Action.TERMINATE;
} else if (Objects.isNull(errorAsString)) {
// success
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
}
if (Objects.nonNull(errorAsString)) {
IMetaTask mc = transaction.getMetaTask();
// the ducc.deploy.JpType is only present for jobs. If not specified
// we return stringified exception to the client. The JD expects
// Java Exception object for its error handling
if (Objects.isNull(System.getProperty("ducc.deploy.JpType"))) {
mc.setUserSpaceException(errorAsString);
} else {
logger.log(Level.INFO, "Sending Exception to JD:\n"
+ ((Exception) processResult.getExceptionObject()));
// JD expects serialized exception as byte[]
mc.setUserSpaceException(serializeError(processResult.getExceptionObject()));
}
}
// send END Request
callEnd(transaction);
if (running && Action.TERMINATE.equals(action)) {
logger.log(Level.WARNING, "Processor Failure - Action=Terminate");
// Can't stop using the current thread. This thread
// came from a thread pool we want to stop. Need
// a new/independent thread to call stop()
new Thread(new Runnable() {
@Override
public void run() {
delegateStop();
}
}).start();
running = false;
}
} catch (IllegalStateException e) {
break;
} catch (TransportException e) {
break;
} catch (Exception e) {
logger.log(Level.WARNING, "", e);
break;
}
}
stopLatch.countDown();
System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(this.getClass())
+ ".call() >>>>>>>>>> Thread [" + Thread.currentThread().getId() + "] "
+ " ProtocolHandler stopped requesting new tasks - Stopping processor");
logger.log(Level.INFO, "ProtocolHandler stopped requesting new tasks - Stopping processor");
if (processor != null) {
processor.stop();
}
return String.valueOf(Thread.currentThread().getId());
}