protected void handleMetadataReply()

in uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java [1100:1234]


  protected void handleMetadataReply(Message message) throws Exception {

    serviceDelegate.cancelDelegateGetMetaTimer();
    serviceDelegate.setState(Delegate.OK_STATE);
    // check if the reply msg contains replyTo destination. It will be
    // added by the Cas Multiplier to the getMeta reply
    if (message.getJMSReplyTo() != null) {
      serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
    }
    // Check if this is a reply for a Ping sent in response to a timeout
    if (serviceDelegate.isAwaitingPingReply()) {
     System.out.println("------------------------ Client Received GetMeta Ping Reply");
    	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(
                Level.INFO,
                CLASS_NAME.getName(),
                "handleMetadataReply",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_rcvd_ping_reply__INFO",
                new Object[] { 
                  message.getStringProperty(AsynchAEMessage.MessageFrom),
                  message.getStringProperty(AsynchAEMessage.ServerIP)});
      }
      //  reset the state of the service. The client received its ping reply  
      serviceDelegate.resetAwaitingPingReply();
      String casReferenceId = null;
      if (serviceDelegate.getCasPendingReplyListSize() > 0 || serviceDelegate.getCasPendingDispatchListSize() > 0) {
    	  serviceDelegate.restartTimerForOldestCasInOutstandingList();
          //	We got a reply to GetMeta ping. Send all CASes that have been
          //    placed on the pending dispatch queue to a service.
    	  while( serviceDelegate.getState()==Delegate.OK_STATE && (casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null ) {
    	        ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
    	        if (cachedRequest != null) {
    	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
    	            UIMAFramework.getLogger(CLASS_NAME).logrb(
    	                    Level.INFO,
    	                    CLASS_NAME.getName(),
    	                    "handleMetadataReply",
    	                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
    	                    "UIMAJMS_dispatch_delayed_cas__INFO",
    	                    new Object[] { casReferenceId, String.valueOf(cachedRequest.cas.hashCode())});
    	          }
    	          sendCAS(cachedRequest.getCAS(), cachedRequest,null);
    	        }
    	  }
      } else {
        ProcessTrace pt = new ProcessTrace_impl();
        UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
        notifyListeners(null, status, AsynchAEMessage.GetMeta);
      }
      // Handled Ping reply
      return;
    }
    int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
    removeFromCache(uniqueIdentifier);

    try {
      if (AsynchAEMessage.Exception == payload) {
        ProcessTrace pt = new ProcessTrace_impl();
        UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
        Exception exception = retrieveExceptionFromMessage(message);
        clientSideJmxStats.incrementMetaErrorCount();
        status.addEventStatus("GetMeta", "Failed", exception);
        notifyListeners(null, status, AsynchAEMessage.GetMeta);
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.INFO,
                  CLASS_NAME.getName(),
                  "handleMetadataReply",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_received_exception_msg_INFO",
                  new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
                      getBrokerURI(),
                      message.getStringProperty(AsynchAEMessage.CasReference), exception });
        }
        abort = true;
        initialized = false;
      } else {
        // Check serialization supported by the service against client configuration.
        // If the client is configured to use Binary serialization *but* the service
        // doesnt support it, change the client serialization to xmi. Old services will
        // not return in a reply the type of serialization supported which implies "xmi".
        // New services *always* return "binary" or "compressedBinaryXXX" 
        // as a default serialization. The client
        // however may still want to serialize messages using xmi though.
        if (!message.propertyExists(AsynchAEMessage.SERIALIZATION)) {
          // Dealing with an old service here, check if there is a mismatch with the
          // client configuration. If the client is configured with binary serialization
          // override this and change serialization to "xmi".
          if (getSerialFormat() != SerialFormat.XMI) {
            // Override configured serialization
            setSerialFormat(SerialFormat.XMI);
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_client_serialization_ovveride__WARNING", new Object[] {});
          }
        } else {
          final int c = message.getIntProperty(AsynchAEMessage.SERIALIZATION);
          if (getSerialFormat() != SerialFormat.XMI) {
            // don't override if XMI - because the remote may have different type system
              setSerialFormat((c == AsynchAEMessage.XMI_SERIALIZATION)    ? SerialFormat.XMI :
                              (c == AsynchAEMessage.BINARY_SERIALIZATION) ? SerialFormat.BINARY :
                                                                            SerialFormat.COMPRESSED_FILTERED);
          }
        }
        String meta = ((TextMessage) message).getText();
        ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes("UTF-8"));
        XMLInputSource in1 = new XMLInputSource(bis, null);
        // Adam - store ResouceMetaData in field so we can return it from getMetaData().
        resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
                .parseResourceMetaData(in1);
        // if remote delegate, save type system 
        if (!brokerURI.startsWith("vm:")) { // test if remote
          setRemoteTypeSystem(AggregateAnalysisEngineController_impl.getTypeSystemImpl(resourceMetadata));
        }
        casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                  "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_handling_meta_reply_FINEST",
                  new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
        }
        //  check the state of the client 
        if ( running && asynchManager != null ) {
          //  Merge the metadata only if the client is still running
          asynchManager.addMetadata(resourceMetadata);
        }
      }

    } catch (Exception e) {
      throw e;
    } finally {
      getMetaSemaphore.release();
    }
  }