in src/utils/ActiveMQAnalysisEngineService.cpp [117:250]
void AMQConnection::initialize( ) {
try {
LOGINFO(INFO, "AMQConnection() connecting to " + iv_brokerURL);
// Create a Connection
if (iv_pConnFact == NULL) {
LOGERROR("AMQConnection() invalid connection factory");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() invalid create connection factory");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
bool retrying = false;
while (iv_pConnection == NULL) {
try {
iv_pConnection = iv_pConnFact->createConnection();
if (iv_pConnection == NULL ) {
if (!retrying) {
stringstream str;
str << " AMQConnection::initialize() Connection object is null. Failed to connect to " << iv_brokerURL
<< ". Retrying..." << endl;
LOGWARN(str.str());
retrying = true;
apr_sleep(30000000); //wait 30 seconds to reconnect
}
}
} catch (cms::CMSException& e) {
if (!retrying) {
stringstream str;
str << "AMQConnection::initialize() Failed to connect to " << iv_brokerURL
<< e.getMessage() << ". Retrying..." << endl;
cout << e.getMessage() << endl;
LOGWARN(str.str());
retrying = true;
apr_sleep(30000000); //wait 30 seconds to reconnect
}
}
}
if (retrying) {
LOGWARN("AMQConnection::initialize() Connected to " + iv_brokerURL);
}
//default exception listener
this->iv_pConnection->setExceptionListener(this);
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( this->iv_pConnection );
if( amqConnection != NULL ) {
amqConnection->addTransportListener( this );
}
// Create a Producer Session
LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL);
this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );
if (this->iv_pProducerSession == NULL) {
LOGERROR("AMQConnection() createSession() failed.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() createSession failed." );
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pProducer = this->iv_pProducerSession->createProducer(NULL);
if (this->iv_pProducer == NULL) {
LOGERROR("AMQConnection() could not create MessageProducer ");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() create MessageProducer failed.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
//TODO set sendTimeout ?
////((ActiveMQProducer*)this->iv_pProducer)->setSendTimeout(3000);
//create TextMessage
this->iv_pReplyMessage = this->iv_pProducerSession->createTextMessage();
if (this->iv_pReplyMessage == NULL) {
LOGERROR("AMQConnection() create textMessage failed. ");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() failed to create message.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_valid = true;
LOGINFO(0, "AMQConnection() connected successfully to " + iv_brokerURL);
} catch (cms::CMSException& e) {
LOGERROR("AMQConnection(): " + e.getMessage());
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam(e.getMessage());
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
} catch (...) {
cout << "... " << endl;
LOGERROR("AMQConnection() failed to create a connection");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() create connection failed");
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}