public String sendAndReceiveCAS()

in uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java [2186:2384]


  public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt, List<AnalysisEnginePerformanceMetrics> componentMetricsList, String targetServiceId) throws ResourceProcessException {
    if (!running) {
      throw new ResourceProcessException(new Exception("Uima EE Client Not In Running State"));
    }
    if (!serviceDelegate.isSynchronousAPI()) {
      // Change the flag to indicate synchronous invocation.
      // This info will be needed to handle Ping replies.
      // Different code is used for handling PING replies for
      // sync and async API.
      serviceDelegate.setSynchronousAPI();
    }
    String casReferenceId = null;
    // keep handle to CAS, we'll deserialize into this same CAS later
    sendAndReceiveCAS = aCAS;

    ThreadMonitor threadMonitor = null;

    if (threadMonitorMap.containsKey(Thread.currentThread().getId())) {
      threadMonitor = (ThreadMonitor) threadMonitorMap.get(Thread.currentThread().getId());
    } else {
      threadMonitor = new ThreadMonitor(Thread.currentThread().getId());
      threadMonitorMap.put(Thread.currentThread().getId(), threadMonitor);
    }

    ClientRequest cachedRequest = produceNewClientRequestObject();
    cachedRequest.setSynchronousInvocation();
//    cachedRequest.setTargetServiceId(targetServiceId);
    //	save application provided List where the performance stats will be copied
    //  when reply comes back
    cachedRequest.setComponentMetricsList(componentMetricsList);
    
    // This is synchronous call, acquire and hold the semaphore before
    // dispatching a CAS to a service. The semaphore will be released
    // iff:
    // a) reply is received (success or failure with exception)
    // b) timeout occurs
    // c) client is stopped
    // Once the semaphore is acquired and the CAS is dispatched
    // the thread will block in trying to acquire the semaphore again
    // below.
    if (threadMonitor != null && threadMonitor.getMonitor() != null) {
      try {
        threadMonitor.getMonitor().acquire();
      } catch (InterruptedException e) {
      	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                    "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_client_interrupted_INFO", new Object[] { casReferenceId, String.valueOf(aCAS.hashCode())});
        }
    	// cancel the timer if it is associated with a CAS this thread is waiting for. This would be
    	// the oldest CAS submitted to a queue for processing. The timer will be canceled and restarted
    	// for the second oldest CAS in the outstanding list.
    	serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
    	throw new ResourceProcessException(e);
      } 
    }
    try {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_cas_submitting_FINE", new Object[] { casReferenceId, String.valueOf(aCAS.hashCode()), Thread.currentThread().getId()});
      }
      // send CAS. This call does not block. Instead we will block the sending thread below.
      casReferenceId = sendCAS(aCAS, cachedRequest, targetServiceId);

    } catch( ResourceProcessException e) {
      
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", new Object[] { e });
      }
      threadMonitor.getMonitor().release();
      removeFromCache(casReferenceId);
      throw e;
    }
    if (threadMonitor != null && threadMonitor.getMonitor() != null) {
      while (running) {
        try {
          // Block sending thread until a reply is received. The thread
          // will be signaled either when a reply to the request just
          // sent is received OR a Ping reply was received. The latter
          // is necessary to allow handling of CASes delayed due to
          // a timeout. A previous request timed out and the service
          // state was changed to TIMEDOUT. While the service is in this
          // state all sending threads add outstanding CASes to the list
          // of CASes pending dispatch and each waits until the state
          // of the service changes to OK. The state is changed to OK
          // when the client receives a reply to a PING request. When
          // the Ping reply comes, the client will signal this thread.
          // The thread checks the list of CASes pending dispatch trying
          // to find an entry that matches ID of the CAS previously
          // delayed. If the CAS is found in the delayed list, it will
          // be removed from the list and send to the service for
          // processing. The 'wasSignaled' flag is only set when the
          // CAS reply is received. Ping reply logic does not change
          // this flag.
          threadMonitor.getMonitor().acquire();
          // if the semaphore was cleared in the stop() method, the client
          // must be in a shutdown mode. Throw an exception back to the
          // caller. The CAS reply has not come back yet.
          if ( threadMonitor.wasCalledFromStopMethod()) {
        	  throw new ResourceProcessException(new UimaAsClientStoppingException("Client is stopping - sendAndReceive() has been interrupted"));
          }
          // Send thread was awoken by either process reply or ping reply
          // If the service is in the ok state and the CAS is in the
          // list of CASes pending dispatch, remove the CAS from the list
          // and send it to the service.
          if (cachedRequest.isTimeoutException() || cachedRequest.isProcessException()) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                      "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_process_exception_handler5__WARNING", new Object[] { String.valueOf(aCAS.hashCode()), Thread.currentThread().getId()});
            }
            // Handled outside of the while-loop below
            break;
          }
          if (running && serviceDelegate.getState() == Delegate.OK_STATE
                  && serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
            sendCAS(aCAS, cachedRequest, targetServiceId);
          } else {
            break; // done here, received a reply or the client is not running
          }
        } catch (InterruptedException e) {
        	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                        "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                        "UIMAJMS_client_interrupted_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
          }
        	// try to remove from pending dispatch list. If not there, remove from pending reply list
        	if ( !serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
            serviceDelegate.removeCasFromOutstandingList(casReferenceId);
        	}
        	// cancel the timer if it is associated with a CAS this thread is waiting for. This would be
        	// the oldest CAS submitted to a queue for processing. The timer will be canceled and restarted
        	// for the second oldest CAS in the outstanding list.
        	serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
        	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                        "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                        "UIMAJMS_client_canceled_timer_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
            }
            removeFromCache(casReferenceId);
        	throw new ResourceProcessException(e);
        } finally {
          threadMonitor.getMonitor().release();
        }
      }
    } // if

    if (abort) {
      throw new ResourceProcessException(new RuntimeException("Uima AS Client API Stopping"));
    }
    // check if timeout exception
    if (cachedRequest.isTimeoutException()) {
      String qName="";
      try {
        qName = getEndPointName();
      } catch( Exception e) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_exception__WARNING", e);
      }
     // Request To Process Cas Has Timed-out.  
      throw new ResourceProcessException(JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "" +
      		"UIMAJMS_process_timeout_WARNING", 
      		new Object[]{qName, getBrokerURI(), cachedRequest.getHostIpProcessingCAS()},
      		new UimaASProcessCasTimeout("UIMA AS Client Timed Out Waiting for Reply From Service:"+qName+" Broker:"+getBrokerURI()));
    }
    // If a reply contains process exception, throw an exception and let the
    // listener decide what happens next
    if (cachedRequest.isProcessException()) {
      String qName="";
      try {
        qName = getEndPointName();
      } catch( Exception e) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_exception__WARNING", e);
      }
      throw new ResourceProcessException(
              JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "" +
              "UIMAJMS_received_exception_msg_INFO",
              new Object[]{qName, getBrokerURI(), casReferenceId},
              cachedRequest.getException());
    }
    try {
      // Process reply in the send thread
      Message message = cachedRequest.getMessage();
      if (message != null) {
        deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
      }
    } catch (ResourceProcessException rpe) {
      throw rpe;
    } catch (Exception e) {
      throw new ResourceProcessException(e);
    }
    return casReferenceId;
  }