in axis-rt-transport-jms/src/main/java/org/apache/axis/transport/jms/JMSConnector.java [211:415]
public abstract JMSEndpoint createEndpoint(String destinationName)
throws JMSException;
public abstract JMSEndpoint createEndpoint(Destination destination)
throws JMSException;
protected abstract javax.jms.Connection internalConnect(
ConnectionFactory connectionFactory,
String username,
String password)
throws JMSException;
private abstract class Connection extends Thread implements ExceptionListener
{
private ConnectionFactory m_connectionFactory;
protected javax.jms.Connection m_connection;
protected boolean m_isActive;
private boolean m_needsToConnect;
private boolean m_startConnection;
private String m_clientID;
private String m_username;
private String m_password;
private Object m_jmsLock;
private Object m_lifecycleLock;
protected Connection(ConnectionFactory connectionFactory,
javax.jms.Connection connection,
String threadName,
String clientID,
String username,
String password)
throws JMSException
{
super(threadName);
m_connectionFactory = connectionFactory;
m_clientID = clientID;
m_username = username;
m_password = password;
m_jmsLock = new Object();
m_lifecycleLock = new Object();
if (connection != null)
{
m_needsToConnect = false;
m_connection = connection;
m_connection.setExceptionListener(this);
if(m_clientID != null)
m_connection.setClientID(m_clientID);
}
else
{
m_needsToConnect = true;
}
m_isActive = true;
}
public ConnectionFactory getConnectionFactory()
{
return m_connectionFactory;
}
public String getClientID()
{
return m_clientID;
}
public String getUsername()
{
return m_username;
}
public String getPassword()
{
return m_password;
}
/**
* @todo handle non-recoverable errors
*/
public void run()
{
// loop until a connection is made and when a connection is made (re)establish
// any subscriptions
while (m_isActive)
{
if (m_needsToConnect)
{
m_connection = null;
try
{
m_connection = internalConnect(m_connectionFactory,
m_username, m_password);
m_connection.setExceptionListener(this);
if(m_clientID != null)
m_connection.setClientID(m_clientID);
}
catch(JMSException e)
{
// simply backoff for a while and then retry
try { Thread.sleep(m_connectRetryInterval); } catch(InterruptedException ie) { }
continue;
}
}
else
m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
// we lost the connection
// we now have a valid connection so establish some context
try
{
internalOnConnect();
}
catch(Exception e)
{
// insert code to handle non recoverable errors
// simply retry
continue;
}
synchronized(m_jmsLock)
{
try { m_jmsLock.wait(); } catch(InterruptedException ie) { } // until notified due to some change in status
}
}
// no longer staying connected, so see what we can cleanup
internalOnShutdown();
}
void startConnection()
{
synchronized(m_lifecycleLock)
{
if(m_startConnection)
return;
m_startConnection = true;
try {m_connection.start();}catch(Throwable e) { } // ignore
}
}
void stopConnection()
{
synchronized(m_lifecycleLock)
{
if(!m_startConnection)
return;
m_startConnection = false;
try {m_connection.stop();}catch(Throwable e) { } // ignore
}
}
void shutdown()
{
m_isActive = false;
synchronized(m_jmsLock)
{
m_jmsLock.notifyAll();
}
}
public void onException(JMSException exception)
{
if(m_adapter.isRecoverable(exception,
JMSVendorAdapter.ON_EXCEPTION_ACTION))
return;
onException();
synchronized(m_jmsLock)
{
m_jmsLock.notifyAll();
}
}
private final void internalOnConnect()
throws Exception
{
onConnect();
synchronized(m_lifecycleLock)
{
if(m_startConnection)
{
try {m_connection.start();}catch(Throwable e) { } // ignore
}
}
}
private final void internalOnShutdown()
{
stopConnection();
onShutdown();
try { m_connection.close(); } catch(Throwable e) { } // ignore
}
protected abstract void onConnect()throws Exception;
protected abstract void onShutdown();
protected abstract void onException();
}