in src/utils/ActiveMQAnalysisEngineService.cpp [1261:1418]
void AMQAnalysisEngineService::initialize(ServiceParameters & params) {
try {
//create connection factory
LOGINFO(FINEST,"AMQAnalysisEngineService::initialize() Create connection factory");
this->iv_pConnFact = AMQConnection::createConnectionFactory(params);
if (iv_pConnFact == NULL) {
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisEngineService::initialize() Failed to create connection factory.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
if (!uima::ResourceManager::hasInstance()) {
uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService");
}
ErrorInfo errInfo;
UnicodeString ustr(this->iv_aeDescriptor.c_str());
UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr);
//create a AnalysisEngine and CAS for each instance
for (int i=0; i < iv_numInstances; i++) {
//create the connection
AMQConnection * newConnection = new AMQConnection(this->iv_pConnFact, params.getBrokerURL(), this->iv_pMonitor, i);
if (newConnection == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create ActiveMQ endpoint connection.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
/////newConnection->setExceptionListener(this);
this->iv_vecpConnections.push_back(newConnection);
//create AE
AnalysisEngine * pEngine = uima::TextAnalysisEngine::createTextAnalysisEngine((UnicodeStringRef(ufn).asUTF8().c_str()),
errInfo);
if (pEngine) {
LOGINFO(0,"AMQAnalysisEngineService::initialize() AnalysisEngine initialization successful.");
} else {
LOGERROR("AMQAnalysisEngineService::initializer() could not create AE" + errInfo.asString());
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisEngineService::initialize() create AE failed. " + errInfo.getMessage().asString() );
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_vecpAnalysisEngines.push_back(pEngine);
//initial FSHeap size
if (this->iv_initialFSHeapSize > 0) {
pEngine->getAnnotatorContext().
assignValue(UIMA_ENGINE_CONFIG_OPTION_FSHEAP_PAGESIZE,this->iv_initialFSHeapSize);
}
//create CAS
CAS * cas = pEngine->newCAS();
if (cas == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create CAS");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQListener::initialize() create CAS failed.");
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_vecpCas.push_back(cas);
//cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create listener " << endl;
//create listeners and register these
AMQListener * newListener = new AMQListener(i,iv_vecpConnections.at(i),
iv_vecpAnalysisEngines.at(i), iv_vecpCas.at(i),
iv_pMonitor);
if (newListener == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create listener.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AnalysisEngineService::initialize() Could not create listener.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_listeners[i] = newListener;
//cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create message consumer " << endl;
//create MessageConsumer session and register Listener
this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector, params.getPrefetchSize());
}
iv_pMonitor->setNumberOfInstances(iv_numInstances);
//cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta " << endl;
//Fast GetMeta
//create connection
LOGINFO(FINEST, "AMQAnalysisEngineService::initialize() Setup GETMETA instance.");
iv_pgetMetaConnection = new AMQConnection(this->iv_pConnFact, params.getBrokerURL(), this->iv_pMonitor, iv_numInstances);
if (iv_pgetMetaConnection == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create fast getmeta ActiveMQ endpoint connection.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
//cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta listener" << endl;
//create a MessageListener MessageConsumer to handle getMeta requests only.
AMQListener * newListener = new AMQListener(iv_numInstances,iv_pgetMetaConnection,
iv_vecpAnalysisEngines.at(0),
this->iv_pMonitor);
if (newListener == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create Fast getMeta listener.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisEngineService::initialize() Could not create listener.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_listeners[this->iv_numInstances] = newListener;
iv_pgetMetaConnection->createMessageConsumer(iv_inputQueueName,
getmeta_selector,0);
this->iv_pMonitor->setGetMetaListenerId(iv_numInstances);
//cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() done" << endl;
} catch (uima::Exception e) {
cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() failed " << e.getErrorInfo().asString() << endl;
LOGERROR("AMQAnalysisEngineService::initialize() " + e.asString());
throw e;
}
}