void AMQConnection::initialize()

in src/utils/ActiveMQAnalysisEngineService.cpp [117:250]


 void AMQConnection::initialize( ) {

    try {
  
      LOGINFO(INFO, "AMQConnection() connecting to " + iv_brokerURL);
      
      // Create a Connection
      if (iv_pConnFact == NULL) {	
        LOGERROR("AMQConnection() invalid connection factory");
        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
        msg.addParam("AMQConnection() invalid create connection factory");
        ErrorInfo errInfo;
        errInfo.setMessage(msg);
        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
          errInfo.getMessage(),
          errInfo.getMessage().getMessageID(),
          ErrorInfo::unrecoverable);
      }

      bool retrying = false;
      while (iv_pConnection == NULL) {
        try {
          iv_pConnection = iv_pConnFact->createConnection();
          if (iv_pConnection == NULL ) {
            if (!retrying) {
              stringstream str;
              str << " AMQConnection::initialize() Connection object is null. Failed to connect to " << iv_brokerURL
                         << ". Retrying..." << endl;
              LOGWARN(str.str());
              retrying = true;
              apr_sleep(30000000); //wait 30 seconds to reconnect
            }
          }
        } catch (cms::CMSException& e) {
          if (!retrying) {
            stringstream str;
            str << "AMQConnection::initialize() Failed to connect to " << iv_brokerURL
                         << e.getMessage() << ". Retrying..." << endl;
			cout << e.getMessage() << endl;
            LOGWARN(str.str());
            retrying = true;
            apr_sleep(30000000); //wait 30 seconds to reconnect
          }
        } 
      }
     
      if (retrying) {
        LOGWARN("AMQConnection::initialize() Connected to " + iv_brokerURL);
      }

      //default exception listener
      this->iv_pConnection->setExceptionListener(this);	
      ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( this->iv_pConnection );
      if( amqConnection != NULL ) {
        amqConnection->addTransportListener( this );
      }
      // Create a Producer Session
      LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL);
      this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );

      if  (this->iv_pProducerSession == NULL) {
        LOGERROR("AMQConnection() createSession() failed."); 
        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
        msg.addParam("AMQConnection() createSession failed." );
        ErrorInfo errInfo;
        errInfo.setMessage(msg);
        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
          errInfo.getMessage(),
          errInfo.getMessage().getMessageID(),
          ErrorInfo::unrecoverable);
      } 
      this->iv_pProducer = this->iv_pProducerSession->createProducer(NULL);
      if (this->iv_pProducer == NULL) {
        LOGERROR("AMQConnection() could not create MessageProducer ");
        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
        msg.addParam("AMQConnection() create MessageProducer failed.");
        ErrorInfo errInfo;
        errInfo.setMessage(msg);
        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
          errInfo.getMessage(),
          errInfo.getMessage().getMessageID(),
          ErrorInfo::unrecoverable);
      }
      this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
	  
	  //TODO set sendTimeout ?
      ////((ActiveMQProducer*)this->iv_pProducer)->setSendTimeout(3000);
      
	  //create TextMessage
      this->iv_pReplyMessage = this->iv_pProducerSession->createTextMessage();
      if (this->iv_pReplyMessage == NULL) {
        LOGERROR("AMQConnection() create textMessage failed. ");
        ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
        msg.addParam("AMQConnection() failed to create message.");
        ErrorInfo errInfo;
        errInfo.setMessage(msg);
        UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
          UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
          errInfo.getMessage(),
          errInfo.getMessage().getMessageID(),
          ErrorInfo::unrecoverable);
      }
      this->iv_valid = true;
      LOGINFO(0, "AMQConnection() connected successfully to " + iv_brokerURL);
    } catch (cms::CMSException& e) {
      LOGERROR("AMQConnection(): " + e.getMessage());  
      ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
      msg.addParam(e.getMessage());
      ErrorInfo errInfo;
      errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
      errInfo.setMessage(msg);
      UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
        UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
        errInfo.getMessage(),
        errInfo.getMessage().getMessageID(),
        ErrorInfo::unrecoverable);
    } catch (...) {
      cout << "... " << endl;
      LOGERROR("AMQConnection() failed to create a connection");  
      ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
      msg.addParam("AMQConnection() create connection failed");
      ErrorInfo errInfo;
      errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
      errInfo.setMessage(msg);
      UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, 
        UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
        errInfo.getMessage(),
        errInfo.getMessage().getMessageID(),
        ErrorInfo::unrecoverable);
    }
  }