protected void notifyOnTimout()

in uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java [2414:2543]


  protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, String casReferenceId) {

    ProcessTrace pt = new ProcessTrace_impl();
    UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt, aCAS, casReferenceId);

    switch (aTimeoutKind) {
      case (MetadataTimeout):
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_meta_timeout_WARNING", new Object[] { anEndpoint });
        }
        status.addEventStatus("GetMeta", "Failed", new UimaASMetaRequestTimeout("UIMA AS Client Timed Out Waiting For GetMeta Reply From a Service On Queue:"+anEndpoint));
        notifyListeners(null, status, AsynchAEMessage.GetMeta);
        abort = true;
        getMetaSemaphore.release();
        break;
      case (PingTimeout):
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_meta_timeout_WARNING", new Object[] { anEndpoint });
        }
        status.addEventStatus("Ping", "Failed", new UimaASPingTimeout("UIMA AS Client Timed Out Waiting For Ping Reply From a Service On Queue:"+anEndpoint));
        notifyListeners(null, status, AsynchAEMessage.Ping);
        // The main thread could be stuck waiting for a CAS. Grab any CAS in the
        // client cache and release it so that we can shutdown.
        if (!clientCache.isEmpty()) {
          ClientRequest anyCasRequest = clientCache.elements().nextElement();
          if (anyCasRequest.getCAS() != null) {
            anyCasRequest.getCAS().release();
          }
        }
        abort = true;
        break;
      case (CpCTimeout):
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                  "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_cpc_timeout_INFO", new Object[] { anEndpoint });
        }
        status.addEventStatus("CpC", "Failed", 
                new UimaASCollectionProcessCompleteTimeout("UIMA AS Client Timed Out Waiting For CPC Reply From a Service On Queue:"+anEndpoint));
        // release the semaphore acquired in collectionProcessingComplete()
        cpcReplySemaphore.release();
        notifyListeners(null, status, AsynchAEMessage.CollectionProcessComplete);
        break;

      case (ProcessTimeout):
    	  if ( casReferenceId != null ) {
    	        ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
    	        if (cachedRequest != null) {
    	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
    	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
    	                  "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
    	                  "UIMAJMS_process_timeout_WARNING", new Object[] { anEndpoint, getBrokerURI(), cachedRequest.getHostIpProcessingCAS() });
    	          }
    	        } else {
    	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
    	            // if missing for any reason ...
    	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
    	                    "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
    	                    "UIMAJMS_received_expired_msg_INFO",
    	                    new Object[] { anEndpoint, casReferenceId });
    	          }
    	          return;
    	        }
    	        // Store the total latency for this CAS. The departure time is set right before the CAS
    	        // is sent to a service.
    	        cachedRequest.setTimeWaitingForReply(System.nanoTime()
    	                - cachedRequest.getCASDepartureTime());

    	        // mark timeout exception
    	        cachedRequest.setTimeoutException();

    	        if (cachedRequest.isSynchronousInvocation()) {
    	          // Signal a thread that we received a reply, if in the map
    	          if (threadMonitorMap.containsKey(cachedRequest.getThreadId())) {
    	            ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest
    	                    .getThreadId());
    	            // Unblock the sending thread so that it can complete processing with an error
    	            if (threadMonitor != null) {
    	              threadMonitor.getMonitor().release();
    	              cachedRequest.setReceivedProcessCasReply(); // should not be needed
    	            }
    	          } 
    	        } else {
    	          // notify the application listener with the error
    	          if ( serviceDelegate.isPingTimeout()) {
    	            exc = new UimaASProcessCasTimeout(new UimaASPingTimeout("UIMA AS Client Ping Time While Waiting For Reply From a Service On Queue:"+anEndpoint));
    	            serviceDelegate.resetPingTimeout();
    	          } else {
    	            exc = new UimaASProcessCasTimeout("UIMA AS Client Timed Out Waiting For CAS:"+casReferenceId+ " Reply From a Service On Queue:"+anEndpoint);
    	          }
    	          status.addEventStatus("Process", "Failed", exc);
    	          notifyListeners(aCAS, status, AsynchAEMessage.Process);
    	        }
    	        boolean isSynchronousCall = cachedRequest.isSynchronousInvocation();
    	        
    	        cachedRequest.removeEntry(casReferenceId);
    	        serviceDelegate.removeCasFromOutstandingList(casReferenceId);
    	        // Check if all replies have been received
    	        long outstandingCasCount = outstandingCasRequests.decrementAndGet();

    	        if (outstandingCasCount <= 0) {
    	          cpcReadySemaphore.release();
    	        }
    	        //	
    	        System.out.println("isSynchronousCall="+isSynchronousCall+" serviceDelegate.getCasPendingReplyListSize()="+serviceDelegate.getCasPendingReplyListSize());
    	        if ( !isSynchronousCall && serviceDelegate.getCasPendingReplyListSize() > 0) {
    	            String nextOutstandingCasReferenceId = 
    	            		serviceDelegate.getOldestCasIdFromOutstandingList();
    	        	if ( nextOutstandingCasReferenceId != null ) {
    	        		cachedRequest = (ClientRequest) clientCache.get(nextOutstandingCasReferenceId);
    	        		if ( cachedRequest != null && cachedRequest.getCAS() != null ) {
        	        		try {
        	        			System.out.println("Sending CAS Again");
        	            		sendCAS(cachedRequest.getCAS());
        	        		} catch( Exception e) {
        	        			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
        	                            "notifyOnTimout", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
        	                            "UIMAEE_exception__WARNING", e);
        	        		}
    	        		}
    	        	}
    	        }
    	  }
        break;
    } // case
  }