in src/utils/ActiveMQAnalysisEngineService.cpp [1087:1220]
void AMQListener::sendResponse(const TextMessage * request,
apr_time_t timeToDeserialize,
apr_time_t timeToSerialize,
apr_time_t timeInAnalytic,
apr_time_t idleTime,
apr_time_t elapsedTime,
string msgContent, bool isExceptionMsg) {
//TODO retry
string serverURI;
AMQConnection * replyTo=NULL;
TextMessage * reply=NULL;
try {
const Destination * cmsReplyTo = request->getCMSReplyTo();
if (cmsReplyTo != NULL) {
reply = this->iv_pConnection->getTextMessage();
} else {
LOGINFO(2, "AMQListener::sendResponse() start");
if (!request->propertyExists("ServerURI") ) {
LOGERROR("AMQListener::sendResponse() ServerURI header property does not exist.");
return;
}
if (!request->propertyExists("MessageFrom") ) {
LOGERROR("AMQListener::sendResponse() MessageFrom header property does not exist.");
return;
}
//get handle to a connection
string tmp = request->getStringProperty("ServerURI");
LOGINFO(FINEST,"replyTo ServerURI: " + tmp);
//special handling when protocol is http.
//HTTP protocol not supported by ActiveMQ C++ client.
//replace the http broker URL with the broker URL of
//the queue this listener is attached to.
if (tmp.find("http:") != string::npos) {
serverURI=this->iv_brokerURL;
LOGINFO(FINER,"HTTP reply address: " + tmp);
} else { //send reply via tcp
size_t begpos = tmp.find("tcp:",0);
tmp = tmp.substr(begpos);
size_t endpos = tmp.find_first_of(",");
if (begpos == string::npos) {
LOGERROR("AMQListener::sendResponse() Could not find tcp URL in ServerURI header property.");
return;
}
if (endpos == string::npos) {
serverURI = tmp;
} else {
serverURI = tmp.substr(0, endpos);
}
}
LOGINFO(FINER,"ReplyTo BrokerURL " + serverURI);
//look up the endpoint connection
replyTo = this->iv_replyToConnections.getConnection(serverURI);
if (replyTo == NULL) {
LOGERROR("Could not get a connection to " + serverURI);
return;
}
//get message object
reply = replyTo->getTextMessage();
}
if (reply == NULL) {
LOGERROR("AMQListener::sendResponse() invalid textMessage object " );
return;
}
//construct the reply message
reply->setStringProperty("MessageFrom", this->iv_pConnection->getInputQueueName() );
reply->setStringProperty("ServerURI", this->iv_brokerURL);
reply->setIntProperty("MessageType",RESPONSE);
reply->setLongProperty("TimeInService", elapsedTime*1000);
reply->setLongProperty("IdleTime", idleTime*1000);
if (request->propertyExists("CasReference") ) {
reply->setStringProperty("CasReference", request->getStringProperty("CasReference"));
}
if (request->propertyExists("Command") ) {
reply->setIntProperty("Command", request->getIntProperty("Command"));
if (request->getIntProperty("Command") == PROCESS_CAS_COMMAND) {
reply->setLongProperty("TimeToSerializeCAS", timeToSerialize*1000);
reply->setLongProperty("TimeToDeserializeCAS", timeToDeserialize*1000);
reply->setLongProperty("TimeInAnalytic", timeInAnalytic*1000);
reply->setLongProperty("TimeInProcessCAS", timeInAnalytic*1000);
}
}
if (isExceptionMsg) {
reply->setIntProperty("Payload",EXC_PAYLOAD);
} else {
if (request->propertyExists("Payload") ) {
reply->setIntProperty("Payload",request->getIntProperty("Payload"));
} else {
reply->setIntProperty("Payload",NO_PAYLOAD);
}
}
//cargo
reply->setText(msgContent);
//log the reply message content
stringstream str;
str << "Sending Reply Command: " << reply->getIntProperty("Command") << " MessageType: " << reply->getIntProperty("MessageType") << " ";
if (reply->propertyExists("CasReference") ) {
str << "CasReference: " << reply->getStringProperty("CasReference");
}
LOGINFO(INFO,str.str());
if (cmsReplyTo != NULL) {
str << " to " << ((Queue*)cmsReplyTo)->getQueueName();
} else {
str << " to " << request->getStringProperty("MessageFrom")
<< " at " << serverURI;
}
str << " Message text: " << msgContent;
//std::cout << "****Reply" << msgContent << std::endl;
LOGINFO(FINEST,"PRINT Reply message:\n" + str.str());
//send
if (cmsReplyTo != NULL) {
//cout << "cmsReplyTo=" << cmsReplyTo->toProviderString() << endl;
iv_pConnection->sendMessage(cmsReplyTo);
} else {
replyTo->sendMessage(request->getStringProperty("MessageFrom"));
}
LOGINFO(FINER,"AMQListener::sendResponse DONE");
} catch (CMSException& ex ) {
LOGERROR("AMQListener::handleMessage()" + ex.getMessage());
} catch (...) {
LOGERROR("AMQListener::handleRequest() UnExpected error sending reply.");
}
}