void AMQAnalysisEngineService::initialize()

in src/utils/ActiveMQAnalysisEngineService.cpp [1261:1418]


  void AMQAnalysisEngineService::initialize(ServiceParameters & params) {

    try {
      //create connection factory
      LOGINFO(FINEST,"AMQAnalysisEngineService::initialize() Create connection factory");
      this->iv_pConnFact = AMQConnection::createConnectionFactory(params);
	  
      if (iv_pConnFact == NULL) {
        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
        msg.addParam("AMQAnalysisEngineService::initialize() Failed to create connection factory.");
        ErrorInfo errInfo;
        errInfo.setMessage(msg);
        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
          errInfo.getMessage(),
          errInfo.getMessage().getMessageID(),
          ErrorInfo::unrecoverable);
      }
      if (!uima::ResourceManager::hasInstance()) {
        uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService");
      }
      ErrorInfo errInfo;
      UnicodeString ustr(this->iv_aeDescriptor.c_str());
      UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr);

      //create a AnalysisEngine and CAS for each instance
      for (int i=0; i < iv_numInstances; i++) {
        //create the connection
        AMQConnection * newConnection = new AMQConnection(this->iv_pConnFact, params.getBrokerURL(), this->iv_pMonitor, i);
        if (newConnection == NULL) {
          LOGERROR("AMQAnalysisEngineService::initialize() Could not create ActiveMQ endpoint connection.");
          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
          msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker.");
          ErrorInfo errInfo;
          errInfo.setMessage(msg);
          UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
            errInfo.getMessage(),
            errInfo.getMessage().getMessageID(),
            ErrorInfo::unrecoverable);
        }
        /////newConnection->setExceptionListener(this);
        this->iv_vecpConnections.push_back(newConnection);
        //create AE
        AnalysisEngine * pEngine = uima::TextAnalysisEngine::createTextAnalysisEngine((UnicodeStringRef(ufn).asUTF8().c_str()),
          errInfo);
        if (pEngine) {
          LOGINFO(0,"AMQAnalysisEngineService::initialize() AnalysisEngine initialization successful.");
        } else {
          LOGERROR("AMQAnalysisEngineService::initializer() could not create AE" + errInfo.asString());
          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
          msg.addParam("AMQAnalysisEngineService::initialize() create AE failed. " + errInfo.getMessage().asString() );
          ErrorInfo errInfo;
          errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
            errInfo.setMessage(msg);
          UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
            errInfo.getMessage(),
            errInfo.getMessage().getMessageID(),
            ErrorInfo::unrecoverable);
        }

        this->iv_vecpAnalysisEngines.push_back(pEngine);

        //initial FSHeap size
        if (this->iv_initialFSHeapSize > 0) {
          pEngine->getAnnotatorContext().
            assignValue(UIMA_ENGINE_CONFIG_OPTION_FSHEAP_PAGESIZE,this->iv_initialFSHeapSize);
        }

        //create CAS
        CAS * cas = pEngine->newCAS();
        if (cas == NULL) {
          LOGERROR("AMQAnalysisEngineService::initialize() Could not create CAS");
          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
          msg.addParam("AMQListener::initialize() create CAS failed.");
          ErrorInfo errInfo;
          errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
            errInfo.setMessage(msg);
          UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
            errInfo.getMessage(),
            errInfo.getMessage().getMessageID(),
            ErrorInfo::unrecoverable);
        }
        this->iv_vecpCas.push_back(cas);
      
        //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create listener " << endl;
        //create listeners and register these
        AMQListener * newListener = new AMQListener(i,iv_vecpConnections.at(i),
          iv_vecpAnalysisEngines.at(i), iv_vecpCas.at(i),
          iv_pMonitor);
        if (newListener == NULL) {
          LOGERROR("AMQAnalysisEngineService::initialize() Could not create listener.");
          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
          msg.addParam("AnalysisEngineService::initialize() Could not create listener.");
          ErrorInfo errInfo;
          errInfo.setMessage(msg);
          UIMA_EXC_THROW_NEW(Uima_runtime_error, 
            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
            errInfo.getMessage(),
            errInfo.getMessage().getMessageID(),
            ErrorInfo::unrecoverable);
        }
        this->iv_listeners[i] = newListener;
        //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create message consumer " << endl;
        //create MessageConsumer session and register Listener       
        this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector, params.getPrefetchSize());  
     } 
     
     iv_pMonitor->setNumberOfInstances(iv_numInstances);
     //cout <<  __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta " << endl;
      //Fast GetMeta
      //create connection 
     LOGINFO(FINEST, "AMQAnalysisEngineService::initialize() Setup GETMETA instance.");
     iv_pgetMetaConnection = new AMQConnection(this->iv_pConnFact, params.getBrokerURL(), this->iv_pMonitor, iv_numInstances);

     if (iv_pgetMetaConnection == NULL) {
          LOGERROR("AMQAnalysisEngineService::initialize() Could not create fast getmeta ActiveMQ endpoint connection.");
          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
          msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker.");
          ErrorInfo errInfo;
          errInfo.setMessage(msg);
          UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
            errInfo.getMessage(),
            errInfo.getMessage().getMessageID(),
            ErrorInfo::unrecoverable);
     }

     //cout <<  __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta listener" << endl;
     //create a MessageListener MessageConsumer to handle getMeta requests only.
     AMQListener * newListener = new AMQListener(iv_numInstances,iv_pgetMetaConnection,
          iv_vecpAnalysisEngines.at(0), 
          this->iv_pMonitor);
     if (newListener == NULL) {
          LOGERROR("AMQAnalysisEngineService::initialize() Could not create Fast getMeta listener.");
          ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
          msg.addParam("AMQAnalysisEngineService::initialize() Could not create listener.");
          ErrorInfo errInfo;
          errInfo.setMessage(msg);
          UIMA_EXC_THROW_NEW(Uima_runtime_error, 
            UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
            errInfo.getMessage(),
            errInfo.getMessage().getMessageID(),
            ErrorInfo::unrecoverable);
      }
      this->iv_listeners[this->iv_numInstances] = newListener; 
      iv_pgetMetaConnection->createMessageConsumer(iv_inputQueueName, 
                      getmeta_selector,0); 
      this->iv_pMonitor->setGetMetaListenerId(iv_numInstances);
      //cout <<  __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() done" << endl;
    } catch (uima::Exception e) {
      cout << __FILE__ << __LINE__ <<  "AMQAnalysisEngineService::initialize() failed " << e.getErrorInfo().asString() << endl;
      LOGERROR("AMQAnalysisEngineService::initialize() " + e.asString());
      throw e;
    }
  }