public String call()

in uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java [275:402]


  public String call() throws ServiceInitializationException, ServiceException {
    // we may fail in initialize() in which case the ServiceInitializationException
    // is thrown
    initialize();

    // now wait for application to call start
    awaitStart();

    // check ThreadLocal for a Map entry for this thread id. If not found, create
    // dedicated XStream instance for this thread which will be useed to serialize/deserialize
    // this thread's tasks
    if (threadLocalXStream.get().get(Thread.currentThread().getId()) == null) {
      threadLocalXStream.get().put(Thread.currentThread().getId(),
              XStreamUtils.getXStreamInstance());
    }
    // all threads intialized, enter running state

    IMetaTaskTransaction transaction = null;

    if (logger.isLoggable(Level.INFO)) {
      logger.log(Level.INFO,
              ".............. Thread " + Thread.currentThread().getId() + " ready to process");
    }

    while (running) {

      try {
        // send GET Request
        transaction = callGet(new MetaTaskTransaction());
        // the code may have blocked in callGet for awhile, so check
        // if service is still running. If this service is in quiescing
        // mode, finish processing current task. The while-loop will
        // terminate when the task is finished.
        if (!running && !quiescing) {
          break;
        }
        // transaction may be null if retryUntilSuccessfull was interrupted
        // due to stop
        if (Objects.isNull(transaction) || (!running && !quiescing)) {
          break;
        }
        logger.log(Level.FINE,
                ".............. Thread " + Thread.currentThread().getId() + " processing new task");
        if (Objects.isNull(transaction.getMetaTask())) {
          // this should only be the case when the service is stopping and transport is
          // shutdown
          if (running) {
            logger.log(Level.INFO, ".............. Thread " + Thread.currentThread().getId()
                    + " GET returned null MetaTask while service is in a running state - this is unexpected");
          }
          // if !running, the while loop above will terminate
          continue;
        }

        Object task = transaction.getMetaTask().getUserSpaceTask();

        // send ACK
        transaction = callAck(transaction);
        if (!running && !quiescing) {
          break;
        }
        IProcessResult processResult = processor.process((String) task);

        // assume success
        Action action = Action.CONTINUE;
        // check if process error occurred.
        String errorAsString = processResult.getError();

        if (processResult.terminateProcess()) {
          action = Action.TERMINATE;
        } else if (Objects.isNull(errorAsString)) {
          // success
          transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
        }
        if (Objects.nonNull(errorAsString)) {
          IMetaTask mc = transaction.getMetaTask();
          // the ducc.deploy.JpType is only present for jobs. If not specified
          // we return stringified exception to the client. The JD expects
          // Java Exception object for its error handling
          if (Objects.isNull(System.getProperty("ducc.deploy.JpType"))) {

            mc.setUserSpaceException(errorAsString);
          } else {
            logger.log(Level.INFO, "Sending Exception to JD:\n"
                    + ((Exception) processResult.getExceptionObject()));
            // JD expects serialized exception as byte[]
            mc.setUserSpaceException(serializeError(processResult.getExceptionObject()));
          }

        }

        // send END Request
        callEnd(transaction);
        if (running && Action.TERMINATE.equals(action)) {
          logger.log(Level.WARNING, "Processor Failure - Action=Terminate");
          // Can't stop using the current thread. This thread
          // came from a thread pool we want to stop. Need
          // a new/independent thread to call stop()
          new Thread(new Runnable() {

            @Override
            public void run() {
              delegateStop();
            }
          }).start();
          running = false;
        }

      } catch (IllegalStateException e) {
        break;
      } catch (TransportException e) {
        break;
      } catch (Exception e) {
        logger.log(Level.WARNING, "", e);
        break;
      }
    }
    stopLatch.countDown();
    System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(this.getClass())
            + ".call() >>>>>>>>>> Thread [" + Thread.currentThread().getId() + "] "
            + " ProtocolHandler stopped requesting new tasks - Stopping processor");
    logger.log(Level.INFO, "ProtocolHandler stopped requesting new tasks - Stopping processor");

    if (processor != null) {
      processor.stop();
    }
    return String.valueOf(Thread.currentThread().getId());
  }