void AMQListener::handleRequest()

in src/utils/ActiveMQAnalysisEngineService.cpp [887:1085]


  void AMQListener::handleRequest( const Message* message ) {


    apr_time_t startTime = apr_time_now();
    apr_time_t timeIdle = 0;
    // Idle time is computed as the interval from time last request 
    // was processed. If this is the first request, idle time is 
    // computed from the time the service was started.  
    if (iv_timeLastRequestCompleted != 0)
      timeIdle =  startTime - iv_timeLastRequestCompleted;
    else 
      timeIdle = startTime - iv_pMonitor->getServiceStartTime(); 

    this->iv_pMonitor->processingStarted(this->iv_id, startTime, timeIdle );
    stringstream astr;
    astr << this->iv_id;
    astr << " ****handleRequest(): "<< iv_count << " start"<< endl;
    LOGINFO(FINE,astr.str());
    ///cout << astr.str() << endl;
    ///LOGWARN(astr.str());
    apr_time_t endTime = 0;
    apr_time_t startSerialize = 0;
    apr_time_t startDeserialize = 0;
    apr_time_t startAnnotatorProcess = 0; 
    apr_time_t startSendResponse = 0;
    apr_time_t timeToDeserializeCAS = 0;
    apr_time_t timeToSerializeCAS = 0;
    apr_time_t timeInAnalytic = 0;

    const TextMessage* textMessage=0;
    int command = 0;

    try {	
    
      textMessage = dynamic_cast< const TextMessage* >( message );
      if (textMessage==0) {
        LOGERROR("AMQListener::handleRequest() invalid pointer to TextMessage");
        endTime = apr_time_now();
        iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
        this->iv_timeLastRequestCompleted = apr_time_now();
        return;
      }
      if (textMessage->propertyExists("MessageFrom")) {	
        LOGINFO(FINER,"Received from " + textMessage->getStringProperty("MessageFrom"));
      } 

      //validate request properties
      string errormessage;
      if (!validateRequest(textMessage, errormessage)) {
        LOGERROR("Listener::handleRequest() " + errormessage);
        endTime = apr_time_now();
        sendResponse(textMessage, 0,0,0,timeIdle, endTime-startTime,
          errormessage ,true);
        endTime = apr_time_now();
        iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
        return;
      }

      command = textMessage->getIntProperty("Command");
      astr.str("");
      astr << "Received request Command: " << command ;
      if (textMessage->propertyExists("CasReference")) {
        astr << " CasReference " << textMessage->getStringProperty("CasReference");
      }
   
      astr << "Received request Command: " << command ;
      if (textMessage->propertyExists("CasReference")) {
        astr << " CasReference " << textMessage->getStringProperty("CasReference");
      }
      LOGINFO(INFO,astr.str());

      if (command == PROCESS_CAS_COMMAND) { //process CAS
        LOGINFO(FINE,"Process CAS request start.");
        int payload = textMessage->getIntProperty("Payload");
        //get the text in the payload 
        string text = textMessage->getText().c_str();
        UnicodeString utext(text.c_str());
		text = UnicodeStringRef(utext).asUTF8();
        astr.str("");
        astr << "Payload: " << payload << " Content: " << text ;
        LOGINFO(FINER, astr.str());

        //InputSource
        MemBufInputSource memIS((XMLByte const *)text.c_str(),
          text.length(),
          "sysID");

        //reset the CAS
        iv_pCas->reset();
        ios::openmode mode = ios::binary;
        stringstream xmlstr;  
        //deserialize payload data into the CAS,
        //call AE process method and serialize
        //the CAS which will be sent with the
        //response.
        if (payload == XCAS_PAYLOAD) {
          LOGINFO(FINEST, "AMQListener::handleRequest() XCAS serialization.");
          startDeserialize = apr_time_now();
          XCASDeserializer deserializer;
          deserializer.deserialize(memIS, *iv_pCas); 
          startAnnotatorProcess=apr_time_now();
          timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;

          iv_pEngine->process(*iv_pCas);
          startSerialize=apr_time_now();
          timeInAnalytic = startSerialize - startAnnotatorProcess;

          XCASWriter xcaswriter(*iv_pCas, true);
          xcaswriter.write(xmlstr);
          timeToSerializeCAS = apr_time_now() - startSerialize;
        } else if (payload == XMI_PAYLOAD) {
          //deserialize incoming xmi CAS data.
          LOGINFO(FINEST, "AMQListener::handleRequest() XMI serialization.");
          
          startDeserialize = apr_time_now();
          XmiSerializationSharedData sharedData;
          XmiDeserializer deserializer;
          deserializer.deserialize(memIS,*iv_pCas,sharedData);
          startAnnotatorProcess=apr_time_now();
          timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;
          LOGINFO(FINEST, "AMQListener::handleRequest() calling process.");
          iv_pEngine->process(*iv_pCas);
          startSerialize=apr_time_now();
          timeInAnalytic = startSerialize - startAnnotatorProcess;

          //serialize CAS 
          LOGINFO(FINEST, "AMQListener::handleRequest() calling serialize.");
          XmiWriter xmiwriter(*iv_pCas, true, &sharedData);
          xmiwriter.write(xmlstr);
          //cout << "SERIALIZED CAS " << xmlstr.str() << endl;
          timeToSerializeCAS = apr_time_now() - startSerialize;
          LOGINFO(FINEST, "AMQListener::handleRequest() done processing CAS.");
        }
        //done with this CAS.
        iv_pCas->reset(); 
        //record end time
        endTime = apr_time_now();
        //send reply
        LOGINFO(FINER,"AnalysisEngine::process() completed successfully. Sending reply.");
        sendResponse(textMessage, timeToDeserializeCAS,
                      timeToSerializeCAS, timeInAnalytic,
                      timeIdle, endTime-startTime,
                      xmlstr.str(),false);
        endTime=apr_time_now();
        iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,
          timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS,
          endTime-startSendResponse);
        LOGINFO(FINE,"Process CAS finished.");		
      } else if (command ==  GET_META_COMMAND ) { //get Meta 	 
        LOGINFO(FINE, "Process getMeta request start.");
        endTime = apr_time_now();
        startSendResponse = apr_time_now();
        sendResponse(textMessage, timeToDeserializeCAS,
                  timeToSerializeCAS, timeInAnalytic,
                  timeIdle, endTime-startTime,this->iv_aeDescriptor,false);			
        endTime=apr_time_now();
        //record timing
        iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
        LOGINFO(FINE,"Process getMeta request finished.");			
      } else if (command ==  CPC_COMMAND ) { //CPC       			
        LOGINFO(FINE, "Processing CollectionProcessComplete request start");				
        iv_pEngine->collectionProcessComplete();
        endTime = apr_time_now();
        startSendResponse = apr_time_now();
        sendResponse(textMessage, 0,
                    0, timeInAnalytic,
                    timeIdle,endTime-startTime,"CPC completed.",false);
        endTime=apr_time_now();
        iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
        LOGINFO(FINE, "Processing CollectionProcessComplete request finished.");	
      } 
    } catch (XMLException& e) {
      stringstream str;
      str << "AMQListener::handleRequest XMLException." << e.getMessage();
      LOGERROR(str.str());
      endTime = apr_time_now();
      startSendResponse = apr_time_now();
      sendResponse(textMessage, timeToDeserializeCAS,
                    timeToSerializeCAS, timeInAnalytic,
                    timeIdle, endTime-startTime,str.str(),true);
      endTime = apr_time_now();
      iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
    } catch (uima::Exception e) {
      LOGERROR("AMQListener::handleRequest UIMA Exception " + e.asString());
      endTime = apr_time_now();
      startSendResponse = apr_time_now();
      sendResponse(textMessage, timeToDeserializeCAS,
                      timeToSerializeCAS, timeInAnalytic,
                      timeIdle,endTime-startTime,e.asString(),true);
      endTime = apr_time_now();
      iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
    }	catch(...) {
      LOGERROR("AMQListener::handleRequest Unknown exception ");
      //TODO: log / shurdown ?}   
      endTime = apr_time_now(); 
      iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startTime);

    }
  }