in src/utils/ActiveMQAnalysisEngineService.cpp [887:1085]
void AMQListener::handleRequest( const Message* message ) {
apr_time_t startTime = apr_time_now();
apr_time_t timeIdle = 0;
// Idle time is computed as the interval from time last request
// was processed. If this is the first request, idle time is
// computed from the time the service was started.
if (iv_timeLastRequestCompleted != 0)
timeIdle = startTime - iv_timeLastRequestCompleted;
else
timeIdle = startTime - iv_pMonitor->getServiceStartTime();
this->iv_pMonitor->processingStarted(this->iv_id, startTime, timeIdle );
stringstream astr;
astr << this->iv_id;
astr << " ****handleRequest(): "<< iv_count << " start"<< endl;
LOGINFO(FINE,astr.str());
///cout << astr.str() << endl;
///LOGWARN(astr.str());
apr_time_t endTime = 0;
apr_time_t startSerialize = 0;
apr_time_t startDeserialize = 0;
apr_time_t startAnnotatorProcess = 0;
apr_time_t startSendResponse = 0;
apr_time_t timeToDeserializeCAS = 0;
apr_time_t timeToSerializeCAS = 0;
apr_time_t timeInAnalytic = 0;
const TextMessage* textMessage=0;
int command = 0;
try {
textMessage = dynamic_cast< const TextMessage* >( message );
if (textMessage==0) {
LOGERROR("AMQListener::handleRequest() invalid pointer to TextMessage");
endTime = apr_time_now();
iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
if (textMessage->propertyExists("MessageFrom")) {
LOGINFO(FINER,"Received from " + textMessage->getStringProperty("MessageFrom"));
}
//validate request properties
string errormessage;
if (!validateRequest(textMessage, errormessage)) {
LOGERROR("Listener::handleRequest() " + errormessage);
endTime = apr_time_now();
sendResponse(textMessage, 0,0,0,timeIdle, endTime-startTime,
errormessage ,true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime);
return;
}
command = textMessage->getIntProperty("Command");
astr.str("");
astr << "Received request Command: " << command ;
if (textMessage->propertyExists("CasReference")) {
astr << " CasReference " << textMessage->getStringProperty("CasReference");
}
astr << "Received request Command: " << command ;
if (textMessage->propertyExists("CasReference")) {
astr << " CasReference " << textMessage->getStringProperty("CasReference");
}
LOGINFO(INFO,astr.str());
if (command == PROCESS_CAS_COMMAND) { //process CAS
LOGINFO(FINE,"Process CAS request start.");
int payload = textMessage->getIntProperty("Payload");
//get the text in the payload
string text = textMessage->getText().c_str();
UnicodeString utext(text.c_str());
text = UnicodeStringRef(utext).asUTF8();
astr.str("");
astr << "Payload: " << payload << " Content: " << text ;
LOGINFO(FINER, astr.str());
//InputSource
MemBufInputSource memIS((XMLByte const *)text.c_str(),
text.length(),
"sysID");
//reset the CAS
iv_pCas->reset();
ios::openmode mode = ios::binary;
stringstream xmlstr;
//deserialize payload data into the CAS,
//call AE process method and serialize
//the CAS which will be sent with the
//response.
if (payload == XCAS_PAYLOAD) {
LOGINFO(FINEST, "AMQListener::handleRequest() XCAS serialization.");
startDeserialize = apr_time_now();
XCASDeserializer deserializer;
deserializer.deserialize(memIS, *iv_pCas);
startAnnotatorProcess=apr_time_now();
timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;
iv_pEngine->process(*iv_pCas);
startSerialize=apr_time_now();
timeInAnalytic = startSerialize - startAnnotatorProcess;
XCASWriter xcaswriter(*iv_pCas, true);
xcaswriter.write(xmlstr);
timeToSerializeCAS = apr_time_now() - startSerialize;
} else if (payload == XMI_PAYLOAD) {
//deserialize incoming xmi CAS data.
LOGINFO(FINEST, "AMQListener::handleRequest() XMI serialization.");
startDeserialize = apr_time_now();
XmiSerializationSharedData sharedData;
XmiDeserializer deserializer;
deserializer.deserialize(memIS,*iv_pCas,sharedData);
startAnnotatorProcess=apr_time_now();
timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;
LOGINFO(FINEST, "AMQListener::handleRequest() calling process.");
iv_pEngine->process(*iv_pCas);
startSerialize=apr_time_now();
timeInAnalytic = startSerialize - startAnnotatorProcess;
//serialize CAS
LOGINFO(FINEST, "AMQListener::handleRequest() calling serialize.");
XmiWriter xmiwriter(*iv_pCas, true, &sharedData);
xmiwriter.write(xmlstr);
//cout << "SERIALIZED CAS " << xmlstr.str() << endl;
timeToSerializeCAS = apr_time_now() - startSerialize;
LOGINFO(FINEST, "AMQListener::handleRequest() done processing CAS.");
}
//done with this CAS.
iv_pCas->reset();
//record end time
endTime = apr_time_now();
//send reply
LOGINFO(FINER,"AnalysisEngine::process() completed successfully. Sending reply.");
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,
xmlstr.str(),false);
endTime=apr_time_now();
iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,
timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS,
endTime-startSendResponse);
LOGINFO(FINE,"Process CAS finished.");
} else if (command == GET_META_COMMAND ) { //get Meta
LOGINFO(FINE, "Process getMeta request start.");
endTime = apr_time_now();
startSendResponse = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,this->iv_aeDescriptor,false);
endTime=apr_time_now();
//record timing
iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
LOGINFO(FINE,"Process getMeta request finished.");
} else if (command == CPC_COMMAND ) { //CPC
LOGINFO(FINE, "Processing CollectionProcessComplete request start");
iv_pEngine->collectionProcessComplete();
endTime = apr_time_now();
startSendResponse = apr_time_now();
sendResponse(textMessage, 0,
0, timeInAnalytic,
timeIdle,endTime-startTime,"CPC completed.",false);
endTime=apr_time_now();
iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse);
LOGINFO(FINE, "Processing CollectionProcessComplete request finished.");
}
} catch (XMLException& e) {
stringstream str;
str << "AMQListener::handleRequest XMLException." << e.getMessage();
LOGERROR(str.str());
endTime = apr_time_now();
startSendResponse = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,str.str(),true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
} catch (uima::Exception e) {
LOGERROR("AMQListener::handleRequest UIMA Exception " + e.asString());
endTime = apr_time_now();
startSendResponse = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle,endTime-startTime,e.asString(),true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse);
} catch(...) {
LOGERROR("AMQListener::handleRequest Unknown exception ");
//TODO: log / shurdown ?}
endTime = apr_time_now();
iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startTime);
}
}