void AMQListener::sendResponse()

in src/utils/ActiveMQAnalysisEngineService.cpp [1087:1220]


  void AMQListener::sendResponse(const TextMessage * request, 
                               apr_time_t timeToDeserialize,
                               apr_time_t timeToSerialize, 
                               apr_time_t timeInAnalytic, 
                               apr_time_t idleTime,
                               apr_time_t elapsedTime, 
                               string msgContent, bool isExceptionMsg) {

     //TODO retry
     string serverURI;
     AMQConnection * replyTo=NULL;
     TextMessage * reply=NULL;

     try {
       const Destination * cmsReplyTo = request->getCMSReplyTo();
       if (cmsReplyTo !=  NULL)  {
         reply = this->iv_pConnection->getTextMessage();
       } else {
         LOGINFO(2, "AMQListener::sendResponse() start");
         if (!request->propertyExists("ServerURI")  ) {
           LOGERROR("AMQListener::sendResponse() ServerURI header property does not exist.");
           return;
         }
         if (!request->propertyExists("MessageFrom")  ) {
           LOGERROR("AMQListener::sendResponse() MessageFrom header property does not exist.");
           return;
         }

         //get handle to a connection
         string tmp = request->getStringProperty("ServerURI");
         LOGINFO(FINEST,"replyTo ServerURI: " +  tmp);

         //special handling when protocol is http.
         //HTTP protocol not supported by ActiveMQ C++ client.
         //replace the http broker URL with the broker URL of
         //the queue this listener is attached to.
         if (tmp.find("http:") != string::npos) { 
           serverURI=this->iv_brokerURL;
           LOGINFO(FINER,"HTTP reply address: " + tmp);
         } else {  //send reply via tcp
           size_t begpos = tmp.find("tcp:",0);
           tmp = tmp.substr(begpos);
           size_t endpos = tmp.find_first_of(",");

           if (begpos == string::npos) { 
             LOGERROR("AMQListener::sendResponse() Could not find tcp URL in ServerURI header property.");	 
             return;
           }
           if (endpos == string::npos) {
             serverURI = tmp;
           } else {
             serverURI = tmp.substr(0, endpos);
           }
         }
         LOGINFO(FINER,"ReplyTo BrokerURL " + serverURI);

         //look up the endpoint connection
         replyTo = this->iv_replyToConnections.getConnection(serverURI);
         if (replyTo == NULL) {
           LOGERROR("Could not get a connection to " + serverURI);
           return;
         }
         //get message object
         reply = replyTo->getTextMessage();
       }

       if (reply == NULL) {
         LOGERROR("AMQListener::sendResponse() invalid textMessage object " );
         return;
       }

       //construct the reply message
       reply->setStringProperty("MessageFrom", this->iv_pConnection->getInputQueueName() );
       reply->setStringProperty("ServerURI", this->iv_brokerURL);

       reply->setIntProperty("MessageType",RESPONSE);
       reply->setLongProperty("TimeInService", elapsedTime*1000);
       reply->setLongProperty("IdleTime", idleTime*1000);
       if (request->propertyExists("CasReference")  ) {
         reply->setStringProperty("CasReference", request->getStringProperty("CasReference"));
       }
       if (request->propertyExists("Command")  ) {
         reply->setIntProperty("Command", request->getIntProperty("Command"));
         if (request->getIntProperty("Command") == PROCESS_CAS_COMMAND) {
           reply->setLongProperty("TimeToSerializeCAS", timeToSerialize*1000);
           reply->setLongProperty("TimeToDeserializeCAS", timeToDeserialize*1000);
           reply->setLongProperty("TimeInAnalytic", timeInAnalytic*1000);
           reply->setLongProperty("TimeInProcessCAS", timeInAnalytic*1000);
         }
       }
       if (isExceptionMsg) {
         reply->setIntProperty("Payload",EXC_PAYLOAD);
       } else {
         if (request->propertyExists("Payload") ) {
           reply->setIntProperty("Payload",request->getIntProperty("Payload"));
         } else {
           reply->setIntProperty("Payload",NO_PAYLOAD);
         }
       }
       //cargo
       reply->setText(msgContent);	
       //log the reply message content
       stringstream str;
       str << "Sending Reply Command: " << reply->getIntProperty("Command") << " MessageType: " << reply->getIntProperty("MessageType") << " ";
       if (reply->propertyExists("CasReference") ) {
         str << "CasReference: " << reply->getStringProperty("CasReference");
       }
       LOGINFO(INFO,str.str());

       if (cmsReplyTo != NULL) {
         str << " to " << ((Queue*)cmsReplyTo)->getQueueName();
       } else {
         str << " to " << request->getStringProperty("MessageFrom") 
           << " at " << serverURI;
       }
       str << " Message text: " << msgContent;
       //std::cout << "****Reply" << msgContent << std::endl;
       LOGINFO(FINEST,"PRINT Reply message:\n" + str.str());	

       //send
       if (cmsReplyTo != NULL) {
         //cout << "cmsReplyTo=" << cmsReplyTo->toProviderString() << endl;
         iv_pConnection->sendMessage(cmsReplyTo);
       } else {
         replyTo->sendMessage(request->getStringProperty("MessageFrom"));
       }
       LOGINFO(FINER,"AMQListener::sendResponse DONE");

     } catch (CMSException& ex ) {
       LOGERROR("AMQListener::handleMessage()" +  ex.getMessage());
     } catch (...)  { 
       LOGERROR("AMQListener::handleRequest() UnExpected error sending reply.");
     }
}